This commit is contained in:
revital 2023-05-30 10:49:35 +03:00
commit e6dc4800d8
21 changed files with 1763 additions and 1478 deletions

View File

@ -8,7 +8,7 @@
</br>Experiment Manager, MLOps and Data-Management**
[![GitHub license](https://img.shields.io/github/license/allegroai/clearml.svg)](https://img.shields.io/github/license/allegroai/clearml.svg) [![PyPI pyversions](https://img.shields.io/pypi/pyversions/clearml.svg)](https://img.shields.io/pypi/pyversions/clearml.svg) [![PyPI version shields.io](https://img.shields.io/pypi/v/clearml.svg)](https://pypi.org/project/clearml/) [![Conda version shields.io](https://img.shields.io/conda/v/clearml/clearml)](https://anaconda.org/clearml/clearml) [![Optuna](https://img.shields.io/badge/Optuna-integrated-blue)](https://optuna.org)<br>
[![PyPI Downloads](https://pepy.tech/badge/clearml/month)](https://pypi.org/project/clearml/) [![Artifact Hub](https://img.shields.io/endpoint?url=https://artifacthub.io/badge/repository/allegroai)](https://artifacthub.io/packages/search?repo=allegroai) [![Youtube](https://img.shields.io/badge/ClearML-DD0000?logo=youtube&logoColor=white)](https://www.youtube.com/c/clearml) [![Slack Channel](https://img.shields.io/badge/slack-%23clearml--community-blueviolet?logo=slack)](https://join.slack.com/t/clearml/shared_invite/zt-1rp61f0cg-Bu_7UlETQrvHHjw~hEBh5A) [![Signup](https://img.shields.io/badge/Clear%7CML-Signup-brightgreen)](https://app.clear.ml)
[![PyPI Downloads](https://pepy.tech/badge/clearml/month)](https://pypi.org/project/clearml/) [![Artifact Hub](https://img.shields.io/endpoint?url=https://artifacthub.io/badge/repository/allegroai)](https://artifacthub.io/packages/search?repo=allegroai) [![Youtube](https://img.shields.io/badge/ClearML-DD0000?logo=youtube&logoColor=white)](https://www.youtube.com/c/clearml) [![Slack Channel](https://img.shields.io/badge/slack-%23clearml--community-blueviolet?logo=slack)](https://join.slack.com/t/clearml/shared_invite/zt-1v74jzwkn-~XsuWB0btXOlfFQCh8DJQw) [![Signup](https://img.shields.io/badge/Clear%7CML-Signup-brightgreen)](https://app.clear.ml)
</div>
@ -197,8 +197,7 @@ More information in the [official documentation](https://clear.ml/docs) and [on
For examples and use cases, check the [examples folder](https://github.com/allegroai/clearml/tree/master/examples) and [corresponding documentation](https://clear.ml/docs/latest/docs/guides).
If you have any questions: post on our [Slack Channel](https://join.slack.com/t/clearml/shared_invite/zt-1rp61f0cg-Bu_7UlETQrvHHjw~hEBh5A
), or tag your questions on [stackoverflow](https://stackoverflow.com/questions/tagged/clearml) with '**[clearml](https://stackoverflow.com/questions/tagged/clearml)**' tag (*previously [trains](https://stackoverflow.com/questions/tagged/trains) tag*).
If you have any questions: post on our [Slack Channel](https://join.slack.com/t/clearml/shared_invite/zt-1v74jzwkn-~XsuWB0btXOlfFQCh8DJQw), or tag your questions on [stackoverflow](https://stackoverflow.com/questions/tagged/clearml) with '**[clearml](https://stackoverflow.com/questions/tagged/clearml)**' tag (*previously [trains](https://stackoverflow.com/questions/tagged/trains) tag*).
For feature requests or bug reports, please use [GitHub issues](https://github.com/allegroai/clearml/issues).

View File

@ -470,7 +470,7 @@ class PipelineController(object):
pass
:param post_execute_callback: Callback function, called when a step (Task) is completed
and it other jobs are executed. Allows a user to modify the Task status after completion.
and other jobs are executed. Allows a user to modify the Task status after completion.
.. code-block:: py
@ -738,7 +738,7 @@ class PipelineController(object):
pass
:param post_execute_callback: Callback function, called when a step (Task) is completed
and it other jobs are executed. Allows a user to modify the Task status after completion.
and other jobs are executed. Allows a user to modify the Task status after completion.
.. code-block:: py
@ -862,7 +862,7 @@ class PipelineController(object):
pass
:param Callable step_task_completed_callback: Callback function, called when a step (Task) is completed
and it other jobs are executed. Allows a user to modify the Task status after completion.
and other jobs are executed. Allows a user to modify the Task status after completion.
.. code-block:: py
@ -951,7 +951,7 @@ class PipelineController(object):
def connect_configuration(self, configuration, name=None, description=None):
# type: (Union[Mapping, list, Path, str], Optional[str], Optional[str]) -> Union[dict, Path, str]
"""
Connect a configuration dictionary or configuration file (pathlib.Path / str) to a the PipelineController object.
Connect a configuration dictionary or configuration file (pathlib.Path / str) to the PipelineController object.
This method should be called before reading the configuration file.
For example, a local file:
@ -1373,7 +1373,7 @@ class PipelineController(object):
pass
:param Callable step_task_completed_callback: Callback function, called when a step (Task) is completed
and it other jobs are executed. Allows a user to modify the Task status after completion.
and other jobs are executed. Allows a user to modify the Task status after completion.
.. code-block:: py
@ -1895,7 +1895,7 @@ class PipelineController(object):
pass
:param post_execute_callback: Callback function, called when a step (Task) is completed
and it other jobs are executed. Allows a user to modify the Task status after completion.
and other jobs are executed. Allows a user to modify the Task status after completion.
.. code-block:: py
@ -3644,7 +3644,7 @@ class PipelineDecorator(PipelineController):
pass
:param post_execute_callback: Callback function, called when a step (Task) is completed
and it other jobs are executed. Allows a user to modify the Task status after completion.
and other jobs are executed. Allows a user to modify the Task status after completion.
.. code-block:: py

View File

@ -229,6 +229,7 @@ class OptimizerBOHB(SearchStrategy, RandomSeed):
year = {2018},
}
:param eta: float (3)
In each iteration, a complete run of sequential halving is executed. In it,
after evaluating each configuration on the same subset size, only a fraction of

View File

@ -432,7 +432,7 @@ class SearchStrategy(object):
Helper function, Implementation is not required. Default use in process_step default implementation.
Check if the job needs to be aborted or already completed.
If returns ``False``, the job was aborted / completed, and should be taken off the current job list
If returns ``False``, the job was aborted / completed, and should be taken off the current job list.
If there is a budget limitation, this call should update
``self.budget.compute_time.update`` / ``self.budget.iterations.update``
@ -534,6 +534,8 @@ class SearchStrategy(object):
where index 0 is the best performing Task.
Example w/ all_metrics=False:
.. code-block:: py
[
('0593b76dc7234c65a13a301f731958fa',
{
@ -550,6 +552,8 @@ class SearchStrategy(object):
Example w/ all_metrics=True:
.. code-block:: py
[
('0593b76dc7234c65a13a301f731958fa',
{
@ -599,9 +603,8 @@ class SearchStrategy(object):
# type: (int, bool, bool, bool) -> Sequence[(str, dict)]
"""
Return a list of dictionaries of the top performing experiments.
Example: [
{'task_id': Task-ID, 'metrics': scalar-metric-dict, 'hyper_parameters': Hyper-Parameters},
]
Example: ``[{'task_id': Task-ID, 'metrics': scalar-metric-dict, 'hyper_parameters': Hyper-Parameters},]``
Order is based on the controller ``Objective`` object.
:param int top_k: The number of Tasks (experiments) to return.
@ -614,6 +617,8 @@ class SearchStrategy(object):
where index 0 is the best performing Task.
Example w/ all_metrics=False:
.. code-block:: py
[
{
task_id: '0593b76dc7234c65a13a301f731958fa',
@ -632,6 +637,8 @@ class SearchStrategy(object):
Example w/ all_metrics=True:
.. code-block:: py
[
{
task_id: '0593b76dc7234c65a13a301f731958fa',
@ -761,7 +768,8 @@ class SearchStrategy(object):
"""
Set the function used to name a newly created job.
:param callable naming_function:
:param callable naming_function: Callable function for naming a newly created job.
Use the following format:
.. code-block:: py
@ -1072,7 +1080,7 @@ class RandomSearch(SearchStrategy):
class HyperParameterOptimizer(object):
"""
Hyper-parameter search controller. Clones the base experiment, changes arguments and tries to maximize/minimize
Hyperparameter search controller. Clones the base experiment, changes arguments and tries to maximize/minimize
the defined objective.
"""
_tag = 'optimization'
@ -1105,7 +1113,6 @@ class HyperParameterOptimizer(object):
``validation``).
:param str objective_metric_series: The Objective metric series to maximize / minimize (for example, ``loss``).
:param str objective_metric_sign: The objective to maximize / minimize.
The values are:
- ``min`` - Minimize the last reported value for the specified title/series scalar.
@ -1121,23 +1128,20 @@ class HyperParameterOptimizer(object):
default is ``None``, indicating no time limit.
:param float compute_time_limit: The maximum compute time in minutes. When time limit is exceeded,
all jobs aborted. (Optional)
:param bool auto_connect_task: Store optimization arguments and configuration in the Task
:param bool auto_connect_task: Store optimization arguments and configuration in the Task.
The values are:
- ``True`` - The optimization argument and configuration will be stored in the Task. All arguments will
be under the hyper-parameter section ``opt``, and the optimization hyper_parameters space will
be under the hyperparameter section ``opt``, and the optimization hyper_parameters space will be
stored in the Task configuration object section.
- ``False`` - Do not store with Task.
- ``Task`` - A specific Task object to connect the optimization process with.
:param bool always_create_task: Always create a new Task
:param bool always_create_task: Always create a new Task.
The values are:
- ``True`` - No current Task initialized. Create a new task named ``optimization`` in the ``base_task_id``
project.
- ``False`` - Use the :py:meth:`task.Task.current_task` (if exists) to report statistics.
:param str spawn_project: If project name is specified, create all optimization Jobs (Tasks) in the
@ -1505,9 +1509,8 @@ class HyperParameterOptimizer(object):
# type: (int, bool, bool, bool) -> Sequence[(str, dict)]
"""
Return a list of dictionaries of the top performing experiments.
Example: [
{'task_id': Task-ID, 'metrics': scalar-metric-dict, 'hyper_parameters': Hyper-Parameters},
]
Example: ``[{'task_id': Task-ID, 'metrics': scalar-metric-dict, 'hyper_parameters': Hyper-Parameters},]``
Order is based on the controller ``Objective`` object.
:param int top_k: The number of Tasks (experiments) to return.
@ -1520,6 +1523,8 @@ class HyperParameterOptimizer(object):
where index 0 is the best performing Task.
Example w/ all_metrics=False:
.. code-block:: py
[
{
task_id: '0593b76dc7234c65a13a301f731958fa',
@ -1538,6 +1543,8 @@ class HyperParameterOptimizer(object):
Example w/ all_metrics=True:
.. code-block:: py
[
{
task_id: '0593b76dc7234c65a13a301f731958fa',
@ -1615,7 +1622,6 @@ class HyperParameterOptimizer(object):
``validation``).
:param str objective_metric_series: The Objective metric series to maximize / minimize (for example, ``loss``).
:param str objective_metric_sign: The objective to maximize / minimize.
The values are:
- ``min`` - Minimize the last reported value for the specified title/series scalar.

View File

@ -110,7 +110,7 @@ class Parameter(RandomSeed):
class UniformParameterRange(Parameter):
"""
Uniform randomly sampled hyper-parameter object.
Uniform randomly sampled hyperparameter object.
"""
def __init__(
@ -129,8 +129,7 @@ class UniformParameterRange(Parameter):
:param float min_value: The minimum sample to use for uniform random sampling.
:param float max_value: The maximum sample to use for uniform random sampling.
:param float step_size: If not ``None``, set step size (quantization) for value sampling.
:param bool include_max_value: Range includes the ``max_value``
:param bool include_max_value: Range includes the ``max_value``.
The values are:
- ``True`` - The range includes the ``max_value`` (Default)
@ -221,7 +220,7 @@ class LogUniformParameterRange(UniformParameterRange):
class UniformIntegerParameterRange(Parameter):
"""
Uniform randomly sampled integer Hyper-Parameter object.
Uniform randomly sampled integer Hyperparameter object.
"""
def __init__(self, name, min_value, max_value, step_size=1, include_max_value=True):
@ -233,8 +232,7 @@ class UniformIntegerParameterRange(Parameter):
:param int min_value: The minimum sample to use for uniform random sampling.
:param int max_value: The maximum sample to use for uniform random sampling.
:param int step_size: The default step size is ``1``.
:param bool include_max_value: Range includes the ``max_value``
:param bool include_max_value: Range includes the ``max_value``.
The values are:
- ``True`` - Includes the ``max_value`` (Default)

View File

@ -277,15 +277,11 @@ class Session(TokenManager):
return list(retry_codes)
def _load_vaults(self):
# () -> Optional[bool]
def _read_vaults(self):
# () -> Optional[dict]
if not self.check_min_api_version("2.15") or self.feature_set == "basic":
return
if ENV_DISABLE_VAULT_SUPPORT.get():
# (self._logger or get_logger()).debug("Vault support is disabled")
return
def parse(vault):
# noinspection PyBroadException
try:
@ -306,13 +302,23 @@ class Session(TokenManager):
vaults = res.json().get("data", {}).get("vaults", [])
data = list(filter(None, map(parse, vaults)))
if data:
self.config.set_overrides(*data)
return True
return data
elif res.status_code != 404:
raise Exception(res.json().get("meta", {}).get("result_msg", res.text))
except Exception as ex:
(self._logger or get_logger()).warning("Failed getting vaults: {}".format(ex))
def _load_vaults(self):
# () -> Optional[bool]
if ENV_DISABLE_VAULT_SUPPORT.get():
# (self._logger or get_logger()).debug("Vault support is disabled")
return
data = self._read_vaults()
if data:
self.config.set_overrides(*data)
return True
def _apply_config_sections(self, local_logger):
# type: (_LocalLogger) -> None # noqa: F821
default = self.config.get("sdk.apply_environment", False)
@ -649,7 +655,7 @@ class Session(TokenManager):
if session:
active_sessions.append(session)
new_sessions_weakrefs.append(session_weakref)
cls._sessions_weakrefs = session_weakref
cls._sessions_weakrefs = new_sessions_weakrefs
return active_sessions
@classmethod

View File

@ -118,6 +118,13 @@ class HyperParams(object):
item = make_item(i)
props.update({item.name: item})
if self.task.is_offline():
hyperparams = self.task.data.hyperparams or {}
hyperparams.setdefault("properties", tasks.SectionParams())
hyperparams["properties"].update(props)
self.task._save_data_to_offline_dir(hyperparams=hyperparams)
return True
res = self.task.session.send(
tasks.EditHyperParamsRequest(
task=self.task.task_id,

View File

@ -91,6 +91,12 @@ class ScriptRequirements(object):
for fname, lines in sklearn.items():
modules.add('scikit_learn', fname, lines)
# bugfix, replace sklearn with scikit-learn name
if 'skimage' in modules:
skimage = modules.pop('skimage', {})
for fname, lines in skimage.items():
modules.add('scikit_image', fname, lines)
# if we have torch and it supports tensorboard, we should add that as well
# (because it will not be detected automatically)
if 'torch' in modules and 'tensorboard' not in modules and 'tensorboardX' not in modules:

View File

@ -56,6 +56,9 @@ from .repo import ScriptInfo, pip_freeze
from .hyperparams import HyperParams
from ...config import config, PROC_MASTER_ID_ENV_VAR, SUPPRESS_UPDATE_MESSAGE_ENV_VAR, DOCKER_BASH_SETUP_ENV_VAR
from ...utilities.process.mp import SingletonLock
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ...model import BaseModel
class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
@ -366,7 +369,13 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
)
res = self.send(req)
return res.response.id if res else 'offline-{}'.format(str(uuid4()).replace("-", ""))
if res:
return res.response.id
id = "offline-{}".format(str(uuid4()).replace("-", ""))
self._edit(type=tasks.TaskTypeEnum(task_type))
return id
def _set_storage_uri(self, value):
value = value.rstrip('/') if value else None
@ -1374,6 +1383,22 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
execution.model_labels = enumeration
self._edit(execution=execution)
def remove_input_models(self, models_to_remove):
# type: (Sequence[Union[str, BaseModel]]) -> ()
"""
Remove input models from the current task. Note that the models themselves are not deleted,
but the tasks' reference to the models is removed.
To delete the models themselves, see `Models.remove`
:param models_to_remove: The models to remove from the task. Can be a list of ids,
or of `BaseModel` (including its subclasses: `Model` and `InputModel`)
"""
ids_to_remove = [model if isinstance(model, str) else model.id for model in models_to_remove]
with self._edit_lock:
self.reload()
self.data.models.input = [model for model in self.data.models.input if model.model not in ids_to_remove]
self._edit(models=self.data.models)
def _set_default_docker_image(self):
# type: () -> ()
if not DOCKER_IMAGE_ENV_VAR.exists() and not DOCKER_BASH_SETUP_ENV_VAR.exists():
@ -1928,6 +1953,11 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
This call is not cached, any call will retrieve all the scalar reports from the back-end.
If the Task has many scalars reported, it might take long for the call to return.
.. note::
Calling this method will return potentially downsampled scalars. The maximum number of returned samples is 5000.
Even when setting `max_samples` to a value larger than 5000, it will be limited to at most 5000 samples.
To fetch all scalar values, please see the :meth:`Task.get_all_reported_scalars`.
Example:
.. code-block:: py
@ -1937,12 +1967,13 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
"y": [10, 11 ,12]
}}}
:param int max_samples: Maximum samples per series to return. Default is 0 returning all scalars.
:param int max_samples: Maximum samples per series to return. Default is 0 returning up to 5000 samples.
With sample limit, average scalar values inside sampling window.
:param str x_axis: scalar x_axis, possible values:
'iter': iteration (default), 'timestamp': timestamp as milliseconds since epoch, 'iso_time': absolute time
:return: dict: Nested scalar graphs: dict[title(str), dict[series(str), dict[axis(str), list(float)]]]
"""
if x_axis not in ('iter', 'timestamp', 'iso_time'):
raise ValueError("Scalar x-axis supported values are: 'iter', 'timestamp', 'iso_time'")
@ -1961,6 +1992,57 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
return response.response_data
def get_all_reported_scalars(self, x_axis='iter'):
# type: (str) -> Mapping[str, Mapping[str, Mapping[str, Sequence[float]]]]
"""
Return a nested dictionary for the all scalar graphs, containing all the registered samples,
where the first key is the graph title and the second is the series name.
Value is a dict with 'x': values and 'y': values.
To fetch downsampled scalar values, please see the :meth:`Task.get_reported_scalars`.
.. note::
This call is not cached, any call will retrieve all the scalar reports from the back-end.
If the Task has many scalars reported, it might take long for the call to return.
:param str x_axis: scalar x_axis, possible values:
'iter': iteration (default), 'timestamp': timestamp as milliseconds since epoch, 'iso_time': absolute time
:return: dict: Nested scalar graphs: dict[title(str), dict[series(str), dict[axis(str), list(float)]]]
"""
reported_scalars = {}
batch_size = 1000
scroll_id = None
while True:
response = self.send(
events.GetTaskEventsRequest(
task=self.id, event_type="training_stats_scalar", scroll_id=scroll_id, batch_size=batch_size
)
)
if not response:
return reported_scalars
response = response.wait()
if not response.ok() or not response.response_data:
return reported_scalars
response = response.response_data
for event in response.get("events", []):
metric = event["metric"]
variant = event["variant"]
if x_axis in ["timestamp", "iter"]:
x_val = event[x_axis]
else:
x_val = datetime.utcfromtimestamp(event["timestamp"] / 1000).isoformat(timespec="milliseconds") + "Z"
y_val = event["value"]
reported_scalars.setdefault(metric, {})
reported_scalars[metric].setdefault(variant, {"name": variant, "x": [], "y": []})
if len(reported_scalars[metric][variant]["x"]) == 0 or reported_scalars[metric][variant]["x"][-1] != x_val:
reported_scalars[metric][variant]["x"].append(x_val)
reported_scalars[metric][variant]["y"].append(y_val)
else:
reported_scalars[metric][variant]["y"][-1] = y_val
if response.get("returned", 0) < batch_size or not response.get("scroll_id"):
break
scroll_id = response["scroll_id"]
return reported_scalars
def get_reported_plots(
self,
max_iterations=None
@ -2440,19 +2522,26 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
"""
return running_remotely() and get_remote_task_id() == self.id
def _edit(self, **kwargs):
# type: (**Any) -> Any
with self._edit_lock:
if self._offline_mode:
def _save_data_to_offline_dir(self, **kwargs):
# type: (**Any) -> ()
for k, v in kwargs.items():
setattr(self.data, k, v)
Path(self.get_offline_mode_folder()).mkdir(parents=True, exist_ok=True)
with open((self.get_offline_mode_folder() / self._offline_filename).as_posix(), "wt") as f:
offline_mode_folder = self.get_offline_mode_folder()
if not offline_mode_folder:
return
Path(offline_mode_folder).mkdir(parents=True, exist_ok=True)
with open((offline_mode_folder / self._offline_filename).as_posix(), "wt") as f:
export_data = self.data.to_dict()
export_data["project_name"] = self.get_project_name()
export_data["offline_folder"] = self.get_offline_mode_folder().as_posix()
export_data["offline_output_models"] = self._offline_output_models
json.dump(export_data, f, ensure_ascii=True, sort_keys=True)
def _edit(self, **kwargs):
# type: (**Any) -> Any
with self._edit_lock:
if self._offline_mode:
self._save_data_to_offline_dir(**kwargs)
return None
# Since we ae using forced update, make sure he task status is valid
@ -2574,6 +2663,8 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
Return the folder where all the task outputs and logs are stored in the offline session.
:return: Path object, local folder, later to be used with `report_offline_session()`
"""
if not self.task_id:
return None
if self._offline_dir:
return self._offline_dir
if not self._offline_mode:

View File

@ -18,7 +18,6 @@ class PatchPyTorchModelIO(PatchBaseModelIO):
__patched = None
__patched_lightning = None
__patched_mmcv = None
__default_checkpoint_filename_counter = {}
@staticmethod
def update_current_task(task, **_):
@ -185,9 +184,9 @@ class PatchPyTorchModelIO(PatchBaseModelIO):
filename = f.name
else:
filename = PatchPyTorchModelIO.__create_default_filename()
filename = PatchPyTorchModelIO.__get_cached_checkpoint_filename()
except Exception:
filename = PatchPyTorchModelIO.__create_default_filename()
filename = PatchPyTorchModelIO.__get_cached_checkpoint_filename()
# give the model a descriptive name based on the file name
# noinspection PyBroadException
@ -195,7 +194,6 @@ class PatchPyTorchModelIO(PatchBaseModelIO):
model_name = Path(filename).stem if filename is not None else None
except Exception:
model_name = None
WeightsFileHandler.create_output_model(
obj, filename, Framework.pytorch, PatchPyTorchModelIO._current_task, singlefile=True, model_name=model_name)
@ -284,11 +282,7 @@ class PatchPyTorchModelIO(PatchBaseModelIO):
return model
@staticmethod
def __create_default_filename():
def __get_cached_checkpoint_filename():
tid = threading.current_thread().ident
checkpoint_filename = PatchPyTorchModelIO._checkpoint_filename.get(tid)
if checkpoint_filename:
return checkpoint_filename
counter = PatchPyTorchModelIO.__default_checkpoint_filename_counter.setdefault(tid, 0)
PatchPyTorchModelIO.__default_checkpoint_filename_counter[tid] += 1
return "default_{}_{}".format(tid, counter)
return checkpoint_filename or None

View File

@ -48,6 +48,10 @@ class PatchedJoblib(object):
joblib.numpy_pickle.NumpyPickler.__init__ = _patched_call(
joblib.numpy_pickle.NumpyPickler.__init__,
PatchedJoblib._numpypickler)
joblib.memory.MemorizedFunc._cached_call = _patched_call(
joblib.memory.MemorizedFunc._cached_call,
PatchedJoblib._cached_call_recursion_guard
)
if not PatchedJoblib._patched_sk_joblib and 'sklearn' in sys.modules:
PatchedJoblib._patched_sk_joblib = True
@ -194,3 +198,8 @@ class PatchedJoblib(object):
"Can't get model framework {}, model framework will be: {} ".format(object_orig_module, framework))
finally:
return framework
@staticmethod
def _cached_call_recursion_guard(original_fn, *args, **kwargs):
# used just to avoid getting into the `_load` binding in the context of memory caching
return original_fn(*args, **kwargs)

View File

@ -553,12 +553,12 @@ def ds_search(args):
+ str(id_col_len)
+ "}"
)
print(formatting.format("project", "name", "tags", "created", "id"))
print(formatting.format("project", "name", "version", "tags", "created", "id"))
print("-" * len(formatting.format("-", "-", "-", "-", "-")))
for d in datasets:
print(
formatting.format(
d["project"], d["name"], str(d["tags"] or [])[1:-1], str(d["created"]).split(".")[0], d["id"]
d["project"], d["name"], d["version"], str(d["tags"] or [])[1:-1], str(d["created"]).split(".")[0], d["id"]
)
)
return 0

View File

@ -122,12 +122,14 @@ class Dataset(object):
__hyperparams_section = "Datasets"
__datasets_runtime_prop = "datasets"
__orig_datasets_runtime_prop_prefix = "orig_datasets"
__preview_media_max_file_size = deferred_config("dataset.preview.media.max_file_size", 5 * 1024 * 1024, transform=int)
__preview_tabular_table_count = deferred_config("dataset.preview.tabular.table_count", 10, transform=int)
__preview_tabular_row_count = deferred_config("dataset.preview.tabular.row_count", 10, transform=int)
__preview_media_image_count = deferred_config("dataset.preview.media.image_count", 10, transform=int)
__preview_media_video_count = deferred_config("dataset.preview.media.video_count", 10, transform=int)
__preview_media_audio_count = deferred_config("dataset.preview.media.audio_count", 10, transform=int)
__preview_media_html_count = deferred_config("dataset.preview.media.html_count", 10, transform=int)
__preview_media_json_count = deferred_config("dataset.preview.media.json_count", 10, transform=int)
_dataset_chunk_size_mb = deferred_config("storage.dataset_chunk_size_mb", 512, transform=int)
def __init__(
@ -191,7 +193,7 @@ class Dataset(object):
if "/.datasets/" not in task.get_project_name() or "":
dataset_project, parent_project = self._build_hidden_project_name(task.get_project_name(), task.name)
task.move_to_project(new_project_name=dataset_project)
if bool(Session.check_min_api_server_version(Dataset.__min_api_version)):
if Dataset.is_offline() or bool(Session.check_min_api_server_version(Dataset.__min_api_version)):
get_or_create_project(task.session, project_name=parent_project, system_tags=[self.__hidden_tag])
get_or_create_project(
task.session,
@ -202,9 +204,21 @@ class Dataset(object):
else:
self._created_task = True
dataset_project, parent_project = self._build_hidden_project_name(dataset_project, dataset_name)
if not Dataset.is_offline():
task = Task.create(
project_name=dataset_project, task_name=dataset_name, task_type=Task.TaskTypes.data_processing)
if bool(Session.check_min_api_server_version(Dataset.__min_api_version)):
else:
task = Task.init(
project_name=dataset_project,
task_name=dataset_name,
task_type=Task.TaskTypes.data_processing,
reuse_last_task_id=False,
auto_connect_frameworks=False,
auto_connect_arg_parser=False,
auto_resource_monitoring=False,
auto_connect_streams=False
)
if Dataset.is_offline() or bool(Session.check_min_api_server_version(Dataset.__min_api_version)):
get_or_create_project(task.session, project_name=parent_project, system_tags=[self.__hidden_tag])
get_or_create_project(
task.session,
@ -218,6 +232,7 @@ class Dataset(object):
if dataset_tags:
task.set_tags((task.get_tags() or []) + list(dataset_tags))
task.mark_started()
if not Dataset.is_offline():
# generate the script section
script = (
"from clearml import Dataset\n\n"
@ -233,7 +248,6 @@ class Dataset(object):
task.data.script.requirements = {'pip': 'clearml == {}\n'.format(__version__)}
# noinspection PyProtectedMember
task._edit(script=task.data.script)
# if the task is running make sure we ping to the server so it will not be aborted by a watchdog
self._task_pinger = DevWorker()
self._task_pinger.register(task, stop_signal_support=False)
@ -279,6 +293,7 @@ class Dataset(object):
self.__preview_video_count = 0
self.__preview_audio_count = 0
self.__preview_html_count = 0
self.__preview_json_count = 0
@property
def id(self):
@ -309,6 +324,7 @@ class Dataset(object):
# type: () -> Mapping[str, LinkEntry]
"""
Notice this call returns an internal representation, do not modify!
:return: dict with relative file path as key, and LinkEntry as value
"""
return self._dataset_link_entries
@ -321,7 +337,7 @@ class Dataset(object):
@property
def name(self):
# type: () -> str
if bool(Session.check_min_api_server_version(Dataset.__min_api_version)):
if Dataset.is_offline() or bool(Session.check_min_api_server_version(Dataset.__min_api_version)):
return self._task.get_project_name().partition("/.datasets/")[-1]
return self._task.name
@ -464,8 +480,8 @@ class Dataset(object):
else:
if len(dataset_path) != len(source_url):
raise ValueError(
f"dataset_path must be a string or a list of strings with the same length as source_url"
f" (received {len(dataset_path)} paths for {len(source_url)} source urls))"
"dataset_path must be a string or a list of strings with the same length as source_url"
" (received {} paths for {} source urls))".format(len(dataset_path), len(source_url))
)
dataset_paths = dataset_path
with ThreadPoolExecutor(max_workers=max_workers) as tp:
@ -628,6 +644,7 @@ class Dataset(object):
If -1 is provided, use a single zip artifact for the entire dataset change-set (old behaviour)
:param max_workers: Numbers of threads to be spawned when zipping and uploading the files.
If None (default) it will be set to:
- 1: if the upload destination is a cloud provider ('s3', 'gs', 'azure')
- number of logical cores: otherwise
:param int retries: Number of retries before failing to upload each zip. If 0, the upload is not retried.
@ -635,6 +652,9 @@ class Dataset(object):
:raise: If the upload failed (i.e. at least one zip failed to upload), raise a `ValueError`
"""
self._report_dataset_preview()
if Dataset.is_offline():
self._serialize()
return
# set output_url
if output_url:
@ -642,7 +662,11 @@ class Dataset(object):
self._task.get_logger().set_default_upload_destination(output_url)
if not max_workers:
max_workers = 1 if self._task.output_uri.startswith(tuple(cloud_driver_schemes)) else psutil.cpu_count()
max_workers = (
1
if self._task.output_uri and self._task.output_uri.startswith(tuple(cloud_driver_schemes))
else psutil.cpu_count()
)
self._task.get_logger().report_text(
"Uploading dataset files: {}".format(
@ -774,6 +798,9 @@ class Dataset(object):
:param raise_on_error: If True, raise exception if dataset finalizing failed
:param auto_upload: Automatically upload dataset if not called yet, will upload to default location.
"""
if Dataset.is_offline():
LoggerRoot.get_base_logger().warning("Cannot finalize dataset in offline mode.")
return
# check we do not have files waiting for upload.
if self._dirty:
if auto_upload:
@ -814,7 +841,7 @@ class Dataset(object):
# type: (Union[numpy.array, pd.DataFrame, Dict[str, Any]], str, bool) -> () # noqa: F821
"""
Attach a user-defined metadata to the dataset. Check `Task.upload_artifact` for supported types.
If type is Optionally make it visible as a table in the UI.
If type is Pandas Dataframes, optionally make it visible as a table in the UI.
"""
if metadata_name.startswith(self.__data_entry_name_prefix):
raise ValueError("metadata_name can not start with '{}'".format(self.__data_entry_name_prefix))
@ -905,6 +932,8 @@ class Dataset(object):
:return: A base folder for the entire dataset
"""
assert self._id
if Dataset.is_offline():
raise ValueError("Cannot get dataset local copy in offline mode.")
if not self._task:
self._task = Task.get_task(task_id=self._id)
if not self.is_final():
@ -950,6 +979,8 @@ class Dataset(object):
:return: The target folder containing the entire dataset
"""
assert self._id
if Dataset.is_offline():
raise ValueError("Cannot get dataset local copy in offline mode.")
max_workers = max_workers or psutil.cpu_count()
target_folder = Path(target_folder).absolute()
target_folder.mkdir(parents=True, exist_ok=True)
@ -1204,7 +1235,7 @@ class Dataset(object):
:return: Newly created Dataset object
"""
if not Session.check_min_api_server_version("2.13"):
if not Dataset.is_offline() and not Session.check_min_api_server_version("2.13"):
raise NotImplementedError("Datasets are not supported with your current ClearML server version. Please update your server.")
parent_datasets = [cls.get(dataset_id=p) if not isinstance(p, Dataset) else p for p in (parent_datasets or [])]
@ -1264,7 +1295,7 @@ class Dataset(object):
if description:
instance.set_description(description)
# noinspection PyProtectedMember
if output_uri and not Task._offline_mode:
if output_uri and not Dataset.is_offline():
# noinspection PyProtectedMember
instance._task.output_uri = output_uri
# noinspection PyProtectedMember
@ -1283,20 +1314,13 @@ class Dataset(object):
instance._serialize()
# noinspection PyProtectedMember
instance._report_dataset_struct()
if not Dataset.is_offline():
# noinspection PyProtectedMember
instance._task.get_logger().report_text(
"ClearML results page: {}".format(instance._task.get_output_log_web_page())
)
if bool(Session.check_min_api_server_version(cls.__min_api_version)):
instance._task.get_logger().report_text( # noqa
"ClearML dataset page: {}".format(
"{}/datasets/simple/{}/experiments/{}".format(
instance._task._get_app_server(), # noqa
instance._task.project if instance._task.project is not None else "*", # noqa
instance._task.id, # noqa
)
)
)
# noinspection PyProtectedMember
instance._log_dataset_page()
# noinspection PyProtectedMember
instance._task.flush(wait_for_uploads=True)
# noinspection PyProtectedMember
@ -1499,6 +1523,8 @@ class Dataset(object):
:param dataset_project: The project the datasets to be renamed belongs to
:param dataset_name: The name of the datasets (before renaming)
"""
if Dataset.is_offline():
raise ValueError("Cannot rename dataset in offline mode")
if not bool(Session.check_min_api_server_version(cls.__min_api_version)):
LoggerRoot.get_base_logger().warning(
"Could not rename dataset because API version < {}".format(cls.__min_api_version)
@ -1544,6 +1570,8 @@ class Dataset(object):
:param dataset_project: Project of the dataset(s) to move to new project
:param dataset_name: Name of the dataset(s) to move to new project
"""
if cls.is_offline():
raise ValueError("Cannot move dataset project in offlime mode")
if not bool(Session.check_min_api_server_version(cls.__min_api_version)):
LoggerRoot.get_base_logger().warning(
"Could not move dataset to another project because API version < {}".format(cls.__min_api_version)
@ -1618,6 +1646,9 @@ class Dataset(object):
:return: Dataset object
"""
if Dataset.is_offline():
raise ValueError("Cannot get dataset in offline mode.")
system_tags = ["__$all", cls.__tag]
if not include_archived:
system_tags = ["__$all", cls.__tag, "__$not", "archived"]
@ -1757,6 +1788,7 @@ class Dataset(object):
"""
Return a Logger object for the Dataset, allowing users to report statistics metrics
and debug samples on the Dataset itself
:return: Logger object
"""
return self._task.get_logger()
@ -1801,6 +1833,9 @@ class Dataset(object):
Examples: `s3://bucket/data`, `gs://bucket/data` , `azure://bucket/data` , `/mnt/share/data`
:return: Newly created dataset object.
"""
if Dataset.is_offline():
raise ValueError("Cannot squash datasets in offline mode")
mutually_exclusive(dataset_ids=dataset_ids, dataset_project_name_pairs=dataset_project_name_pairs)
datasets = [cls.get(dataset_id=d) for d in dataset_ids] if dataset_ids else \
[cls.get(dataset_project=pair[0], dataset_name=pair[1]) for pair in dataset_project_name_pairs]
@ -1877,7 +1912,7 @@ class Dataset(object):
type=[str(Task.TaskTypes.data_processing)],
tags=tags or None,
status=["stopped", "published", "completed", "closed"] if only_completed else None,
only_fields=["created", "id", "name", "project", "tags"],
only_fields=["created", "id", "name", "project", "tags", "runtime"],
search_hidden=True,
exact_match_regex_flag=False,
_allow_extra_fields_=True,
@ -1892,6 +1927,7 @@ class Dataset(object):
"project": cls._remove_hidden_part_from_dataset_project(project_id_lookup[d.project]),
"id": d.id,
"tags": d.tags,
"version": d.runtime.get("version")
}
for d in datasets
]
@ -2028,6 +2064,10 @@ class Dataset(object):
for k, parents in self._dependency_graph.items() if k in used_dataset_versions}
# make sure we do not remove our parents, for geology sake
self._dependency_graph[self._id] = current_parents
if not Dataset.is_offline():
to_delete = [k for k in self._dependency_graph.keys() if k.startswith("offline-")]
for k in to_delete:
del self._dependency_graph[k]
def _serialize(self, update_dependency_chunk_lookup=False):
# type: (bool) -> ()
@ -2609,6 +2649,89 @@ class Dataset(object):
"""
return 'dsh{}'.format(md5text(dataset_id))
@classmethod
def is_offline(cls):
# type: () -> bool
"""
Return offline-mode state, If in offline-mode, no communication to the backend is enabled.
:return: boolean offline-mode state
"""
return Task.is_offline()
@classmethod
def set_offline(cls, offline_mode=False):
# type: (bool) -> None
"""
Set offline mode, where all data and logs are stored into local folder, for later transmission
:param offline_mode: If True, offline-mode is turned on, and no communication to the backend is enabled.
"""
Task.set_offline(offline_mode=offline_mode)
def get_offline_mode_folder(self):
# type: () -> Optional[Path]
"""
Return the folder where all the dataset data is stored in the offline session.
:return: Path object, local folder
"""
return self._task.get_offline_mode_folder()
@classmethod
def import_offline_session(cls, session_folder_zip, upload=True, finalize=False):
# type: (str, bool, bool) -> str
"""
Import an offline session of a dataset.
Includes repository details, installed packages, artifacts, logs, metric and debug samples.
:param session_folder_zip: Path to a folder containing the session, or zip-file of the session folder.
:param upload: If True, upload the dataset's data
:param finalize: If True, finalize the dataset
:return: The ID of the imported dataset
"""
id = Task.import_offline_session(session_folder_zip)
dataset = Dataset.get(dataset_id=id)
# note that there can only be one offline session in the dependency graph: our session
# noinspection PyProtectedMember
dataset._dependency_graph = {
(id if k.startswith("offline-") else k): [(id if sub_v.startswith("offline-") else sub_v) for sub_v in v]
for k, v in dataset._dependency_graph.items() # noqa
}
# noinspection PyProtectedMember
dataset._update_dependency_graph()
# noinspection PyProtectedMember
dataset._log_dataset_page()
started = False
if upload or finalize:
started = True
# noinspection PyProtectedMember
dataset._task.mark_started(force=True)
if upload:
dataset.upload()
if finalize:
dataset.finalize()
if started:
# noinspection PyProtectedMember
dataset._task.mark_completed()
return id
def _log_dataset_page(self):
if bool(Session.check_min_api_server_version(self.__min_api_version)):
self._task.get_logger().report_text(
"ClearML dataset page: {}".format(
"{}/datasets/simple/{}/experiments/{}".format(
self._task._get_app_server(),
self._task.project if self._task.project is not None else "*",
self._task.id,
)
)
)
def _build_dependency_chunk_lookup(self):
# type: () -> Dict[str, int]
"""
@ -2850,7 +2973,10 @@ class Dataset(object):
dependency_graph_ex[id_] = parents
task = Task.get_task(task_id=id_)
dataset_struct_entry = {"job_id": id_, "status": task.status}
dataset_struct_entry = {
"job_id": id_[len("offline-"):] if id_.startswith("offline-") else id_, # .removeprefix not supported < Python 3.9
"status": task.status
}
# noinspection PyProtectedMember
last_update = task._get_last_update()
if last_update:
@ -2964,7 +3090,7 @@ class Dataset(object):
except Exception:
pass
continue
if compression:
if compression or os.path.getsize(file_path) > self.__preview_media_max_file_size:
continue
guessed_type = mimetypes.guess_type(file_path)
if not guessed_type or not guessed_type[0]:
@ -2982,6 +3108,9 @@ class Dataset(object):
elif guessed_type == "text/html" and self.__preview_html_count < self.__preview_media_html_count:
self._task.get_logger().report_media("HTML", file_name, local_path=file_path)
self.__preview_html_count += 1
elif guessed_type == "application/json" and self.__preview_json_count < self.__preview_media_json_count:
self._task.get_logger().report_media("JSON", file_name, local_path=file_path, file_extension=".txt")
self.__preview_json_count += 1
@classmethod
def _set_project_system_tags(cls, task):
@ -3366,7 +3495,7 @@ class Dataset(object):
if not dataset_project:
return None, None
project_name = cls._remove_hidden_part_from_dataset_project(dataset_project)
if bool(Session.check_min_api_server_version(cls.__min_api_version)):
if Dataset.is_offline() or bool(Session.check_min_api_server_version(cls.__min_api_version)):
parent_project = "{}.datasets".format(dataset_project + "/" if dataset_project else "")
if dataset_name:
project_name = "{}/{}".format(parent_project, dataset_name)

View File

@ -1067,7 +1067,7 @@ class Logger(object):
:param str uri: example: 's3://bucket/directory/' or 'file:///tmp/debug/'
:return: True, if the destination scheme is supported (for example, ``s3://``, ``file://``, or ``gc://``).
:return: True, if the destination scheme is supported (for example, ``s3://``, ``file://``, or ``gs://``).
False, if not supported.
"""

File diff suppressed because it is too large Load Diff

View File

@ -44,7 +44,7 @@ def get_config_object_matcher(**patterns):
def quote_url(url):
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
if parsed.scheme not in ("http", "https", "gs"):
return url
parsed = parsed._replace(path=quote(parsed.path))
return urlunparse(parsed)

View File

@ -1692,6 +1692,15 @@ class Task(_Task):
dist.init_process_group('gloo')
run(config.get('node_rank'), config.get('total_num_nodes'))
When using the ClearML cloud autoscaler apps, one needs to make sure the nodes can reach eachother.
The machines need to be in the same security group, the `MASTER_PORT` needs to be exposed and the
`MASTER_ADDR` needs to be the right private ip of the instance the master is running on.
For example, to achieve this, one can set the following Docker arguments in the `Additional ClearML Configuration` section:
.. code-block:: py
agent.extra_docker_arguments=["--ipc=host", "--network=host", "-p", "29500:29500", "--env", "CLEARML_MULTI_NODE_MASTER_DEF_ADDR=`hostname -I | awk '{print $1}'`"]`
:param total_num_nodes: The total number of nodes to be enqueued, including the master node,
which should already be enqueued when running remotely
:param port: Port opened by the master node. If the environment variable `CLEARML_MULTI_NODE_MASTER_DEF_PORT`
@ -1702,8 +1711,13 @@ class Task(_Task):
:param queue: The queue to enqueue the nodes to. Can be different than the queue the master
node is enqueued to. If None, the nodes will be enqueued to the same queue as the master node
:param wait: If True, the master node will wait for the other nodes to start
:param addr: The address of the master node's worker. If not set, it defaults to the private IP
of the machine the master is running on
:param addr: The address of the master node's worker. If the environment variable
`CLEARML_MULTI_NODE_MASTER_DEF_ADDR` is set, the value of this parameter will be set to
the one defined in `CLEARML_MULTI_NODE_MASTER_DEF_ADDR`.
If `CLEARML_MULTI_NODE_MASTER_DEF_ADDR` doesn't exist, but `MASTER_ADDR` does, then the value of this
parameter will be set to the one defined in `MASTER_ADDR`. If neither environment variables exist,
the value passed to the parameter will be used. If this value is None (default), the private IP of
the machine the master node is running on will be used.
:return: A dictionary containing relevant information regarding the multi node run. This dictionary
has the following entries:
@ -1724,10 +1738,14 @@ class Task(_Task):
raise UsageError("Master task is not enqueued to any queue and the queue parameter is None")
master_conf = {
"master_addr": get_private_ip(),
"master_port": int(os.environ.get("CLEARML_MULTI_NODE_MASTER_DEF_PORT", os.environ.get("MASTER_PORT", port))),
"master_addr": os.environ.get(
"CLEARML_MULTI_NODE_MASTER_DEF_ADDR", os.environ.get("MASTER_ADDR", addr or get_private_ip())
),
"master_port": int(
os.environ.get("CLEARML_MULTI_NODE_MASTER_DEF_PORT", os.environ.get("MASTER_PORT", port))
),
"node_rank": 0,
"wait": wait
"wait": wait,
}
editable_conf = {"total_num_nodes": total_num_nodes, "queue": queue}
editable_conf = self.connect(editable_conf, name=self._launch_multi_node_section)
@ -4650,14 +4668,3 @@ class Task(_Task):
auto_connect_frameworks={'detect_repository': False}) \
if state['main'] else Task.get_task(task_id=state['id'])
self.__dict__ = task.__dict__
def __getattr__(self, name):
try:
self.__getattribute__(name)
except AttributeError as e:
if self.__class__ is Task:
getLogger().warning(
"'clearml.Task' object has no attribute '{}'. Did you mean to import 'Task' from 'allegroai'?".format(name)
)
raise e

View File

@ -1 +1 @@
__version__ = '1.10.4'
__version__ = '1.11.0'

View File

@ -1,11 +1,14 @@
# ClearML - Example of LightGBM integration
#
import lightgbm as lgb
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.metrics import mean_squared_error
from clearml import Task
def main():
# Connecting ClearML with the current process,
# from here on everything is logged automatically
task = Task.init(project_name="examples", task_name="LightGBM")
@ -14,7 +17,6 @@ print('Loading data...')
# Load or create your dataset
df_train = pd.read_csv(
'https://raw.githubusercontent.com/microsoft/LightGBM/master/examples/regression/regression.train',
header=None, sep='\t'
@ -38,24 +40,32 @@ params = {
'boosting_type': 'gbdt',
'objective': 'regression',
'metric': {'l2', 'l1'},
'num_leaves': 31,
'num_leaves': 200,
'max_depth': 0,
'learning_rate': 0.05,
'feature_fraction': 0.9,
'bagging_fraction': 0.8,
'bagging_freq': 5,
'verbose': 0,
'force_col_wise': True,
'deterministic': True,
}
evals_result = {} # to record eval results for plotting
print('Starting training...')
# Train
gbm = lgb.train(
params,
lgb_train,
num_boost_round=20,
valid_sets=[lgb_eval],
callbacks=[lgb.early_stopping(stopping_rounds=5)],
num_boost_round=500,
valid_sets=[lgb_train, lgb_eval],
feature_name=[f'f{i + 1}' for i in range(X_train.shape[-1])],
categorical_feature=[21],
callbacks=[
lgb.record_evaluation(evals_result),
],
)
print('Saving model...')
@ -63,6 +73,21 @@ print('Saving model...')
# Save model to file
gbm.save_model('model.txt')
print('Plotting metrics recorded during training...')
ax = lgb.plot_metric(evals_result, metric='l1')
plt.show()
print('Plotting feature importances...')
ax = lgb.plot_importance(gbm, max_num_features=10)
plt.show()
print('Plotting split value histogram...')
ax = lgb.plot_split_value_histogram(gbm, feature='f26', bins='auto')
plt.show()
print('Loading model to predict...')
# Load model to predict
@ -73,3 +98,7 @@ y_pred = bst.predict(X_test)
# Eval with loaded model
print("The rmse of loaded model's prediction is:", mean_squared_error(y_test, y_pred) ** 0.5)
if __name__ == '__main__':
main()

View File

@ -1,4 +1,5 @@
lightgbm
scikit-learn
pandas
matplotlib
clearml

View File

@ -109,7 +109,7 @@
"metadata": {},
"outputs": [],
"source": [
"columns_categories = data_task.artifacts[\"Categries per column\"].get()\n",
"columns_categories = data_task.artifacts[\"Categories per column\"].get()\n",
"columns_categories_ordered = {\n",
" key: columns_categories[key]\n",
" for key in train_set.columns\n",