This commit is contained in:
revital 2023-10-18 20:54:44 +03:00
commit fe5601a8ca
25 changed files with 360 additions and 62 deletions

5
.gitignore vendored
View File

@ -7,8 +7,9 @@
# Python # Python
*.pyc *.pyc
__pycache__ __pycache__
build/ /build/
dist/ /dist/
*/conda_build/build/
*.egg-info *.egg-info
.env .env
.venv/ .venv/

View File

@ -182,7 +182,7 @@ If ClearML is part of your development process / project / publication, please c
``` ```
@misc{clearml, @misc{clearml,
title = {ClearML - Your entire MLOps stack in one open-source tool}, title = {ClearML - Your entire MLOps stack in one open-source tool},
year = {2019}, year = {2023},
note = {Software available from http://github.com/allegroai/clearml}, note = {Software available from http://github.com/allegroai/clearml},
url={https://clear.ml/}, url={https://clear.ml/},
author = {ClearML}, author = {ClearML},

View File

@ -99,6 +99,8 @@ class PipelineController(object):
monitor_artifacts = attrib(type=list, default=None) # List of artifact names to monitor monitor_artifacts = attrib(type=list, default=None) # List of artifact names to monitor
monitor_models = attrib(type=list, default=None) # List of models to monitor monitor_models = attrib(type=list, default=None) # List of models to monitor
explicit_docker_image = attrib(type=str, default=None) # The Docker image the node uses, specified at creation explicit_docker_image = attrib(type=str, default=None) # The Docker image the node uses, specified at creation
recursively_parse_parameters = attrib(type=bool, default=False) # if True, recursively parse parameters in
# lists, dicts, or tuples
def __attrs_post_init__(self): def __attrs_post_init__(self):
if self.parents is None: if self.parents is None:
@ -384,7 +386,8 @@ class PipelineController(object):
cache_executed_step=False, # type: bool cache_executed_step=False, # type: bool
base_task_factory=None, # type: Optional[Callable[[PipelineController.Node], Task]] base_task_factory=None, # type: Optional[Callable[[PipelineController.Node], Task]]
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
status_change_callback=None # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
recursively_parse_parameters=False # type: bool
): ):
# type: (...) -> bool # type: (...) -> bool
""" """
@ -405,7 +408,10 @@ class PipelineController(object):
- Parameter access ``parameter_override={'Args/input_file': '${<step_name>.parameters.Args/input_file}' }`` - Parameter access ``parameter_override={'Args/input_file': '${<step_name>.parameters.Args/input_file}' }``
- Pipeline Task argument (see `Pipeline.add_parameter`) ``parameter_override={'Args/input_file': '${pipeline.<pipeline_parameter>}' }`` - Pipeline Task argument (see `Pipeline.add_parameter`) ``parameter_override={'Args/input_file': '${pipeline.<pipeline_parameter>}' }``
- Task ID ``parameter_override={'Args/input_file': '${stage3.id}' }`` - Task ID ``parameter_override={'Args/input_file': '${stage3.id}' }``
:param recursively_parse_parameters: If True, recursively parse parameters from parameter_override in lists, dicts, or tuples.
Example:
- ``parameter_override={'Args/input_file': ['${<step_name>.artifacts.<artifact_name>.url}', 'file2.txt']}`` will be correctly parsed.
- ``parameter_override={'Args/input_file': ('${<step_name_1>.parameters.Args/input_file}', '${<step_name_2>.parameters.Args/input_file}')}`` will be correctly parsed.
:param configuration_overrides: Optional, override Task configuration objects. :param configuration_overrides: Optional, override Task configuration objects.
Expected dictionary of configuration object name and configuration object content. Expected dictionary of configuration object name and configuration object content.
Examples: Examples:
@ -572,6 +578,7 @@ class PipelineController(object):
name=name, base_task_id=base_task_id, parents=parents or [], name=name, base_task_id=base_task_id, parents=parents or [],
queue=execution_queue, timeout=time_limit, queue=execution_queue, timeout=time_limit,
parameters=parameter_override or {}, parameters=parameter_override or {},
recursively_parse_parameters=recursively_parse_parameters,
configurations=configuration_overrides, configurations=configuration_overrides,
clone_task=clone_base_task, clone_task=clone_base_task,
task_overrides=task_overrides, task_overrides=task_overrides,
@ -2237,7 +2244,7 @@ class PipelineController(object):
updated_hyper_parameters = {} updated_hyper_parameters = {}
for k, v in node.parameters.items(): for k, v in node.parameters.items():
updated_hyper_parameters[k] = self._parse_step_ref(v) updated_hyper_parameters[k] = self._parse_step_ref(v, recursive=node.recursively_parse_parameters)
task_overrides = self._parse_task_overrides(node.task_overrides) if node.task_overrides else None task_overrides = self._parse_task_overrides(node.task_overrides) if node.task_overrides else None
@ -2776,11 +2783,12 @@ class PipelineController(object):
except Exception: except Exception:
pass pass
def _parse_step_ref(self, value): def _parse_step_ref(self, value, recursive=False):
# type: (Any) -> Optional[str] # type: (Any) -> Optional[str]
""" """
Return the step reference. For example "${step1.parameters.Args/param}" Return the step reference. For example "${step1.parameters.Args/param}"
:param value: string :param value: string
:param recursive: if True, recursively parse all values in the dict, list or tuple
:return: :return:
""" """
# look for all the step references # look for all the step references
@ -2793,6 +2801,18 @@ class PipelineController(object):
if not isinstance(new_val, six.string_types): if not isinstance(new_val, six.string_types):
return new_val return new_val
updated_value = updated_value.replace(g, new_val, 1) updated_value = updated_value.replace(g, new_val, 1)
# if we have a dict, list or tuple, we need to recursively update the values
if recursive:
if isinstance(value, dict):
updated_value = {}
for k, v in value.items():
updated_value[k] = self._parse_step_ref(v, recursive=True)
elif isinstance(value, list):
updated_value = [self._parse_step_ref(v, recursive=True) for v in value]
elif isinstance(value, tuple):
updated_value = tuple(self._parse_step_ref(v, recursive=True) for v in value)
return updated_value return updated_value
def _parse_task_overrides(self, task_overrides): def _parse_task_overrides(self, task_overrides):
@ -3201,8 +3221,10 @@ class PipelineController(object):
name=artifact_name, name=artifact_name,
artifact_object=artifact_object, artifact_object=artifact_object,
wait_on_upload=True, wait_on_upload=True,
extension_name=".pkl" if isinstance(artifact_object, dict) and extension_name=(
not self._artifact_serialization_function else None, ".pkl" if isinstance(artifact_object, dict) and not self._artifact_serialization_function
else None
),
serialization_function=self._artifact_serialization_function serialization_function=self._artifact_serialization_function
) )

View File

@ -3075,6 +3075,8 @@ class GetScalarMetricDataRequest(Request):
:param model_events: If set then the retrieving model events. Otherwise task :param model_events: If set then the retrieving model events. Otherwise task
events events
:type model_events: bool :type model_events: bool
:param scroll_id: Pass this value on next call to get next page
:type scroll_id: str
""" """
_service = "events" _service = "events"
@ -3095,16 +3097,21 @@ class GetScalarMetricDataRequest(Request):
"type": ["boolean", "null"], "type": ["boolean", "null"],
}, },
"task": {"description": "task ID", "type": ["string", "null"]}, "task": {"description": "task ID", "type": ["string", "null"]},
"scroll_id": {
"description": "Pass this value on next call to get next page",
"type": "string",
},
}, },
"type": "object", "type": "object",
} }
def __init__(self, task=None, metric=None, no_scroll=False, model_events=False, **kwargs): def __init__(self, task=None, metric=None, no_scroll=False, model_events=False, scroll_id=None, **kwargs):
super(GetScalarMetricDataRequest, self).__init__(**kwargs) super(GetScalarMetricDataRequest, self).__init__(**kwargs)
self.task = task self.task = task
self.metric = metric self.metric = metric
self.no_scroll = no_scroll self.no_scroll = no_scroll
self.model_events = model_events self.model_events = model_events
self.scroll_id = scroll_id
@schema_property("task") @schema_property("task")
def task(self): def task(self):
@ -3158,6 +3165,19 @@ class GetScalarMetricDataRequest(Request):
self.assert_isinstance(value, "model_events", (bool,)) self.assert_isinstance(value, "model_events", (bool,))
self._property_model_events = value self._property_model_events = value
@schema_property("scroll_id")
def scroll_id(self):
return self._property_scroll_id
@scroll_id.setter
def scroll_id(self, value):
if value is None:
self._property_scroll_id = None
return
self.assert_isinstance(value, "scroll_id", six.string_types)
self._property_scroll_id = value
class GetScalarMetricDataResponse(Response): class GetScalarMetricDataResponse(Response):
""" """

View File

@ -25,6 +25,8 @@ class InterfaceBase(SessionInterface):
_default_session = None _default_session = None
_num_retry_warning_display = 1 _num_retry_warning_display = 1
_offline_mode = ENV_OFFLINE_MODE.get() _offline_mode = ENV_OFFLINE_MODE.get()
_JSON_EXCEPTION = (jsonschema.ValidationError, requests.exceptions.InvalidJSONError) \
if hasattr(requests.exceptions, "InvalidJSONError") else (jsonschema.ValidationError,)
@property @property
def session(self): def session(self):
@ -83,7 +85,7 @@ class InterfaceBase(SessionInterface):
if raise_on_errors: if raise_on_errors:
raise raise
res = None res = None
except jsonschema.ValidationError as e: except cls._JSON_EXCEPTION as e:
if log: if log:
log.error( log.error(
'Field %s contains illegal schema: %s', '.'.join(e.path), str(e.message) 'Field %s contains illegal schema: %s', '.'.join(e.path), str(e.message)

View File

@ -12,6 +12,7 @@ from ..storage import StorageManager
from ..storage.helper import StorageHelper from ..storage.helper import StorageHelper
from ..utilities.async_manager import AsyncManagerMixin from ..utilities.async_manager import AsyncManagerMixin
ModelPackage = namedtuple("ModelPackage", "weights design") ModelPackage = namedtuple("ModelPackage", "weights design")
@ -77,6 +78,28 @@ class Model(IdObjectBase, AsyncManagerMixin, _StorageUriMixin):
self.send(models.SetReadyRequest(model=self.id, publish_task=False)) self.send(models.SetReadyRequest(model=self.id, publish_task=False))
self.reload() self.reload()
def archive(self):
if Session.check_min_api_server_version("2.13"):
self.send(models.ArchiveManyRequest(ids=[self.id]))
self.reload()
else:
from ..model import BaseModel
# edit will reload
self._edit(
system_tags=list(set((self.data.system_tags or []) if hasattr(self.data, "system_tags") else []) | {BaseModel._archived_tag})
)
def unarchive(self):
if Session.check_min_api_server_version("2.13"):
self.send(models.UnarchiveManyRequest(ids=[self.id]))
self.reload()
else:
from ..model import BaseModel
# edit will reload
self._edit(
system_tags=list(set((self.data.system_tags or []) if hasattr(self.data, "system_tags") else []) - {BaseModel._archived_tag})
)
def _reload(self): def _reload(self):
"""Reload the model object""" """Reload the model object"""
if self._offline_mode: if self._offline_mode:

View File

@ -46,6 +46,7 @@ class CreateAndPopulate(object):
output_uri=None, # type: Optional[str] output_uri=None, # type: Optional[str]
base_task_id=None, # type: Optional[str] base_task_id=None, # type: Optional[str]
add_task_init_call=True, # type: bool add_task_init_call=True, # type: bool
force_single_script_file=False, # type: bool
raise_on_missing_entries=False, # type: bool raise_on_missing_entries=False, # type: bool
verbose=False, # type: bool verbose=False, # type: bool
): ):
@ -84,6 +85,7 @@ class CreateAndPopulate(object):
:param base_task_id: Use a pre-existing task in the system, instead of a local repo/script. :param base_task_id: Use a pre-existing task in the system, instead of a local repo/script.
Essentially clones an existing task and overrides arguments/requirements. Essentially clones an existing task and overrides arguments/requirements.
:param add_task_init_call: If True, a 'Task.init()' call is added to the script entry point in remote execution. :param add_task_init_call: If True, a 'Task.init()' call is added to the script entry point in remote execution.
:param force_single_script_file: If True, do not auto-detect local repository
:param raise_on_missing_entries: If True, raise ValueError on missing entries when populating :param raise_on_missing_entries: If True, raise ValueError on missing entries when populating
:param verbose: If True, print verbose logging :param verbose: If True, print verbose logging
""" """
@ -125,6 +127,7 @@ class CreateAndPopulate(object):
self.task_type = task_type self.task_type = task_type
self.output_uri = output_uri self.output_uri = output_uri
self.task = None self.task = None
self.force_single_script_file = bool(force_single_script_file)
self.raise_on_missing_entries = raise_on_missing_entries self.raise_on_missing_entries = raise_on_missing_entries
self.verbose = verbose self.verbose = verbose
@ -159,6 +162,7 @@ class CreateAndPopulate(object):
detect_jupyter_notebook=False, detect_jupyter_notebook=False,
add_missing_installed_packages=True, add_missing_installed_packages=True,
detailed_req_report=False, detailed_req_report=False,
force_single_script=self.force_single_script_file,
) )
# check if we have no repository and no requirements raise error # check if we have no repository and no requirements raise error
@ -237,6 +241,23 @@ class CreateAndPopulate(object):
task_state['script']['diff'] = '' task_state['script']['diff'] = ''
task_state['script']['working_dir'] = cwd or '.' task_state['script']['working_dir'] = cwd or '.'
task_state['script']['entry_point'] = entry_point or "" task_state['script']['entry_point'] = entry_point or ""
if self.force_single_script_file and Path(self.script).is_file():
create_requirements = self.packages is True
repo_info, requirements = ScriptInfo.get(
filepaths=[Path(self.script).as_posix()],
log=getLogger(),
create_requirements=create_requirements,
uncommitted_from_remote=True,
detect_jupyter_notebook=False,
add_missing_installed_packages=True,
detailed_req_report=False,
force_single_script=self.force_single_script_file,
)
task_state['script']['diff'] = repo_info.script['diff'] or ''
task_state['script']['entry_point'] = repo_info.script['entry_point']
if create_requirements:
task_state['script']['requirements'] = repo_info.script.get('requirements') or {}
else: else:
# standalone task # standalone task
task_state['script']['entry_point'] = self.script or "" task_state['script']['entry_point'] = self.script or ""

View File

@ -246,7 +246,6 @@ class PatchOsFork(object):
os._exit = _at_exit_callback os._exit = _at_exit_callback
@staticmethod @staticmethod
def _patched_fork(*args, **kwargs): def _patched_fork(*args, **kwargs):
if not PatchOsFork._current_task: if not PatchOsFork._current_task:

View File

@ -71,7 +71,7 @@ class PatchLIGHTgbmModelIO(PatchBaseModelIO):
return ret return ret
@staticmethod @staticmethod
def _load(original_fn, model_file, *args, **kwargs): def _load(original_fn, model_file=None, *args, **kwargs):
if not PatchLIGHTgbmModelIO._current_task: if not PatchLIGHTgbmModelIO._current_task:
return original_fn(model_file, *args, **kwargs) return original_fn(model_file, *args, **kwargs)

View File

@ -1589,6 +1589,11 @@ class PatchKerasModelIO(object):
from keras import models as keras_saving # noqa from keras import models as keras_saving # noqa
except ImportError: except ImportError:
keras_saving = None keras_saving = None
try:
from keras.src.saving import saving_api as keras_saving_v3
except ImportError:
keras_saving_v3 = None
# check that we are not patching anything twice # check that we are not patching anything twice
if PatchKerasModelIO.__patched_tensorflow: if PatchKerasModelIO.__patched_tensorflow:
PatchKerasModelIO.__patched_keras = [ PatchKerasModelIO.__patched_keras = [
@ -1598,9 +1603,10 @@ class PatchKerasModelIO(object):
Functional if PatchKerasModelIO.__patched_tensorflow[3] != Functional else None, Functional if PatchKerasModelIO.__patched_tensorflow[3] != Functional else None,
None, None,
None, None,
keras_saving_v3
] ]
else: else:
PatchKerasModelIO.__patched_keras = [Network, Sequential, keras_saving, Functional, None, None] PatchKerasModelIO.__patched_keras = [Network, Sequential, keras_saving, Functional, None, None, keras_saving_v3]
PatchKerasModelIO._patch_io_calls(*PatchKerasModelIO.__patched_keras) PatchKerasModelIO._patch_io_calls(*PatchKerasModelIO.__patched_keras)
if 'tensorflow' in sys.modules and not PatchKerasModelIO.__patched_tensorflow: if 'tensorflow' in sys.modules and not PatchKerasModelIO.__patched_tensorflow:
@ -1643,6 +1649,8 @@ class PatchKerasModelIO(object):
except ImportError: except ImportError:
keras_hdf5 = None keras_hdf5 = None
keras_saving_v3 = None
if PatchKerasModelIO.__patched_keras: if PatchKerasModelIO.__patched_keras:
PatchKerasModelIO.__patched_tensorflow = [ PatchKerasModelIO.__patched_tensorflow = [
Network if PatchKerasModelIO.__patched_keras[0] != Network else None, Network if PatchKerasModelIO.__patched_keras[0] != Network else None,
@ -1651,14 +1659,23 @@ class PatchKerasModelIO(object):
Functional if PatchKerasModelIO.__patched_keras[3] != Functional else None, Functional if PatchKerasModelIO.__patched_keras[3] != Functional else None,
keras_saving_legacy if PatchKerasModelIO.__patched_keras[4] != keras_saving_legacy else None, keras_saving_legacy if PatchKerasModelIO.__patched_keras[4] != keras_saving_legacy else None,
keras_hdf5 if PatchKerasModelIO.__patched_keras[5] != keras_hdf5 else None, keras_hdf5 if PatchKerasModelIO.__patched_keras[5] != keras_hdf5 else None,
keras_saving_v3 if PatchKerasModelIO.__patched_keras[6] != keras_saving_v3 else None,
] ]
else: else:
PatchKerasModelIO.__patched_tensorflow = [ PatchKerasModelIO.__patched_tensorflow = [
Network, Sequential, keras_saving, Functional, keras_saving_legacy, keras_hdf5] Network, Sequential, keras_saving, Functional, keras_saving_legacy, keras_hdf5, keras_saving_v3]
PatchKerasModelIO._patch_io_calls(*PatchKerasModelIO.__patched_tensorflow) PatchKerasModelIO._patch_io_calls(*PatchKerasModelIO.__patched_tensorflow)
@staticmethod @staticmethod
def _patch_io_calls(Network, Sequential, keras_saving, Functional, keras_saving_legacy=None, keras_hdf5=None): def _patch_io_calls(
Network,
Sequential,
keras_saving,
Functional,
keras_saving_legacy=None,
keras_hdf5=None,
keras_saving_v3=None
):
try: try:
if Sequential is not None: if Sequential is not None:
Sequential._updated_config = _patched_call(Sequential._updated_config, Sequential._updated_config = _patched_call(Sequential._updated_config,
@ -1718,6 +1735,9 @@ class PatchKerasModelIO(object):
keras_hdf5.save_model_to_hdf5 = _patched_call( keras_hdf5.save_model_to_hdf5 = _patched_call(
keras_hdf5.save_model_to_hdf5, PatchKerasModelIO._save_model) keras_hdf5.save_model_to_hdf5, PatchKerasModelIO._save_model)
if keras_saving_v3 is not None:
keras_saving_v3.save_model = _patched_call(keras_saving_v3.save_model, PatchKerasModelIO._save_model)
except Exception as ex: except Exception as ex:
LoggerRoot.get_base_logger(TensorflowBinding).warning(str(ex)) LoggerRoot.get_base_logger(TensorflowBinding).warning(str(ex))
@ -2058,6 +2078,11 @@ class PatchTensorflowModelIO(object):
Checkpoint.write = _patched_call(Checkpoint.write, PatchTensorflowModelIO._ckpt_write) Checkpoint.write = _patched_call(Checkpoint.write, PatchTensorflowModelIO._ckpt_write)
except Exception: except Exception:
pass pass
# noinspection PyBroadException
try:
Checkpoint._write = _patched_call(Checkpoint._write, PatchTensorflowModelIO._ckpt_write)
except Exception:
pass
except ImportError: except ImportError:
pass pass
except Exception: except Exception:
@ -2227,21 +2252,24 @@ class PatchTensorflow2ModelIO(object):
return return
PatchTensorflow2ModelIO.__patched = True PatchTensorflow2ModelIO.__patched = True
# noinspection PyBroadException # noinspection PyBroadException
try: try:
# hack: make sure tensorflow.__init__ is called # hack: make sure tensorflow.__init__ is called
import tensorflow # noqa import tensorflow # noqa
from tensorflow.python.training.tracking import util # noqa from tensorflow.python.training.tracking import util # noqa
# noinspection PyBroadException # noinspection PyBroadException
try: try:
util.TrackableSaver.save = _patched_call(util.TrackableSaver.save, util.TrackableSaver.save = _patched_call(util.TrackableSaver.save, PatchTensorflow2ModelIO._save)
PatchTensorflow2ModelIO._save)
except Exception: except Exception:
pass pass
# noinspection PyBroadException # noinspection PyBroadException
try: try:
util.TrackableSaver.restore = _patched_call(util.TrackableSaver.restore, util.TrackableSaver.restore = _patched_call(
PatchTensorflow2ModelIO._restore) util.TrackableSaver.restore, PatchTensorflow2ModelIO._restore
)
except Exception: except Exception:
pass pass
except ImportError: except ImportError:
@ -2249,6 +2277,32 @@ class PatchTensorflow2ModelIO(object):
except Exception: except Exception:
LoggerRoot.get_base_logger(TensorflowBinding).debug('Failed patching tensorflow v2') LoggerRoot.get_base_logger(TensorflowBinding).debug('Failed patching tensorflow v2')
# noinspection PyBroadException
try:
# hack: make sure tensorflow.__init__ is called
import tensorflow # noqa
from tensorflow.python.checkpoint import checkpoint
# noinspection PyBroadException
try:
checkpoint.TrackableSaver.save = _patched_call(
checkpoint.TrackableSaver.save, PatchTensorflow2ModelIO._save
)
except Exception:
pass
# noinspection PyBroadException
try:
checkpoint.TrackableSaver.restore = _patched_call(
checkpoint.TrackableSaver.restore, PatchTensorflow2ModelIO._restore
)
except Exception:
pass
except ImportError:
pass
except Exception:
LoggerRoot.get_base_logger(TensorflowBinding).debug('Failed patching tensorflow v2.11')
@staticmethod @staticmethod
def _save(original_fn, self, file_prefix, *args, **kwargs): def _save(original_fn, self, file_prefix, *args, **kwargs):
model = original_fn(self, file_prefix, *args, **kwargs) model = original_fn(self, file_prefix, *args, **kwargs)

View File

@ -89,6 +89,7 @@ class PatchHydra(object):
if overrides and not isinstance(overrides, (list, tuple)): if overrides and not isinstance(overrides, (list, tuple)):
overrides = [overrides] overrides = [overrides]
overrides += ['{}={}'.format(k, v) for k, v in stored_config.items()] overrides += ['{}={}'.format(k, v) for k, v in stored_config.items()]
overrides = [("+" + o) if (o.startswith("+") and not o.startswith("++")) else o for o in overrides]
else: else:
# We take care of it inside the _patched_run_job # We take care of it inside the _patched_run_job
pass pass

View File

@ -2497,7 +2497,7 @@ class Dataset(object):
# check if target folder is not empty, see if it contains everything we need # check if target folder is not empty, see if it contains everything we need
if target_base_folder and next(target_base_folder.iterdir(), None): if target_base_folder and next(target_base_folder.iterdir(), None):
if self._verify_dataset_folder(target_base_folder, part, chunk_selection): if self._verify_dataset_folder(target_base_folder, part, chunk_selection, max_workers):
target_base_folder.touch() target_base_folder.touch()
self._release_lock_ds_target_folder(target_base_folder) self._release_lock_ds_target_folder(target_base_folder)
return target_base_folder.as_posix() return target_base_folder.as_posix()
@ -2538,7 +2538,7 @@ class Dataset(object):
raise_on_error=False, force=False) raise_on_error=False, force=False)
# verify entire dataset (if failed, force downloading parent datasets) # verify entire dataset (if failed, force downloading parent datasets)
if not self._verify_dataset_folder(target_base_folder, part, chunk_selection): if not self._verify_dataset_folder(target_base_folder, part, chunk_selection, max_workers):
LoggerRoot.get_base_logger().info('Dataset parents need refreshing, re-fetching all parent datasets') LoggerRoot.get_base_logger().info('Dataset parents need refreshing, re-fetching all parent datasets')
# we should delete the entire cache folder # we should delete the entire cache folder
self._extract_parent_datasets( self._extract_parent_datasets(
@ -3214,31 +3214,42 @@ class Dataset(object):
raise ValueError("Dataset merging failed: {}".format([e for e in errors if e is not None])) raise ValueError("Dataset merging failed: {}".format([e for e in errors if e is not None]))
pool.close() pool.close()
def _verify_dataset_folder(self, target_base_folder, part, chunk_selection): def _verify_dataset_folder(self, target_base_folder, part, chunk_selection, max_workers):
# type: (Path, Optional[int], Optional[dict]) -> bool # type: (Path, Optional[int], Optional[dict], Optional[int]) -> bool
def verify_file_or_link(base_folder, ds_part, ds_chunk_selection, file_entry):
# type: (Path, Optional[int], Optional[dict], FileEntry) -> Optional[bool]
# check if we need the file for the requested dataset part
if ds_part is not None:
f_parts = ds_chunk_selection.get(file_entry.parent_dataset_id, [])
# file is not in requested dataset part, no need to check it.
if self._get_chunk_idx_from_artifact_name(file_entry.artifact_name) not in f_parts:
return None
# check if the local size and the stored size match (faster than comparing hash)
if (base_folder / file_entry.relative_path).stat().st_size != file_entry.size:
return False
return True
target_base_folder = Path(target_base_folder) target_base_folder = Path(target_base_folder)
# check dataset file size, if we have a full match no need for parent dataset download / merge # check dataset file size, if we have a full match no need for parent dataset download / merge
verified = True verified = True
# noinspection PyBroadException # noinspection PyBroadException
try: try:
for f in self._dataset_file_entries.values(): futures_ = []
# check if we need it for the current part with ThreadPoolExecutor(max_workers=max_workers) as tp:
if part is not None: for f in self._dataset_file_entries.values():
f_parts = chunk_selection.get(f.parent_dataset_id, []) future = tp.submit(verify_file_or_link, target_base_folder, part, chunk_selection, f)
# this is not in our current part, no need to check it. futures_.append(future)
if self._get_chunk_idx_from_artifact_name(f.artifact_name) not in f_parts:
continue
# check if the local size and the stored size match (faster than comparing hash) for f in self._dataset_link_entries.values():
if (target_base_folder / f.relative_path).stat().st_size != f.size: # don't check whether link is in dataset part, hence None for part and chunk_selection
verified = False future = tp.submit(verify_file_or_link, target_base_folder, None, None, f)
break futures_.append(future)
for f in self._dataset_link_entries.values():
if (target_base_folder / f.relative_path).stat().st_size != f.size:
verified = False
break
verified = all(f.result() is not False for f in futures_)
except Exception: except Exception:
verified = False verified = False

View File

@ -1055,6 +1055,26 @@ class BaseModel(object):
if not self.published: if not self.published:
self._get_base_model().publish() self._get_base_model().publish()
def archive(self):
# type: () -> ()
"""
Archive the model. If the model is already archived, this is a no-op
"""
try:
self._get_base_model().archive()
except Exception:
pass
def unarchive(self):
# type: () -> ()
"""
Unarchive the model. If the model is not archived, this is a no-op
"""
try:
self._get_base_model().unarchive()
except Exception:
pass
def _init_reporter(self): def _init_reporter(self):
if self._reporter: if self._reporter:
return return
@ -2380,11 +2400,15 @@ class OutputModel(BaseModel):
# make sure the created model is updated: # make sure the created model is updated:
out_model_file_name = target_filename or weights_filename or register_uri out_model_file_name = target_filename or weights_filename or register_uri
name = (
Path(out_model_file_name).stem # prefer self._task_connect_name if exists
if out_model_file_name if self._task_connect_name:
else (self._task_connect_name or "Output Model") name = self._task_connect_name
) elif out_model_file_name:
name = Path(out_model_file_name).stem
else:
name = "Output Model"
if not self._base_model: if not self._base_model:
model = self._get_force_base_model(task_model_entry=name) model = self._get_force_base_model(task_model_entry=name)
else: else:

View File

@ -894,7 +894,7 @@ class _GoogleCloudStorageDriver(_Driver):
obj.download_to_filename(str(p)) obj.download_to_filename(str(p))
def test_upload(self, test_path, config, **_): def test_upload(self, test_path, config, **_):
bucket_url = str(furl(scheme=self.scheme, netloc=config.bucket, path=config.subdir)) bucket_url = str(furl(scheme=self.scheme, netloc=config.bucket))
bucket = self.get_container(container_name=bucket_url, config=config).bucket bucket = self.get_container(container_name=bucket_url, config=config).bucket
test_obj = bucket test_obj = bucket

View File

@ -12,7 +12,7 @@ from pathlib2 import Path
from .cache import CacheManager from .cache import CacheManager
from .callbacks import ProgressReport from .callbacks import ProgressReport
from .helper import StorageHelper from .helper import StorageHelper
from .util import encode_string_to_filename, safe_extract from .util import encode_string_to_filename, safe_extract, create_zip_directories
from ..debugging.log import LoggerRoot from ..debugging.log import LoggerRoot
from ..config import deferred_config from ..config import deferred_config
@ -163,7 +163,9 @@ class StorageManager(object):
temp_target_folder.mkdir(parents=True, exist_ok=True) temp_target_folder.mkdir(parents=True, exist_ok=True)
if suffix == ".zip": if suffix == ".zip":
ZipFile(cached_file.as_posix()).extractall(path=temp_target_folder.as_posix()) zip_file = ZipFile(cached_file.as_posix())
create_zip_directories(zip_file, path=temp_target_folder.as_posix())
zip_file.extractall(path=temp_target_folder.as_posix())
elif suffix == ".tar.gz": elif suffix == ".tar.gz":
with tarfile.open(cached_file.as_posix()) as file: with tarfile.open(cached_file.as_posix()) as file:
safe_extract(file, temp_target_folder.as_posix()) safe_extract(file, temp_target_folder.as_posix())

View File

@ -1,7 +1,7 @@
import fnmatch import fnmatch
import hashlib import hashlib
import json import json
import os.path import os
import re import re
import sys import sys
from typing import Optional, Union, Sequence, Dict from typing import Optional, Union, Sequence, Dict
@ -338,6 +338,37 @@ def is_within_directory(directory, target):
return prefix == abs_directory return prefix == abs_directory
def create_zip_directories(zipfile, path=None):
try:
path = os.getcwd() if path is None else os.fspath(path)
for member in zipfile.namelist():
arcname = member.replace("/", os.path.sep)
if os.path.altsep:
arcname = arcname.replace(os.path.altsep, os.path.sep)
# interpret absolute pathname as relative, remove drive letter or
# UNC path, redundant separators, "." and ".." components.
arcname = os.path.splitdrive(arcname)[1]
invalid_path_parts = ("", os.path.curdir, os.path.pardir)
arcname = os.path.sep.join(x for x in arcname.split(os.path.sep) if x not in invalid_path_parts)
if os.path.sep == "\\":
# noinspection PyBroadException
try:
# filter illegal characters on Windows
# noinspection PyProtectedMember
arcname = zipfile._sanitize_windows_name(arcname, os.path.sep)
except Exception:
pass
targetpath = os.path.normpath(os.path.join(path, arcname))
# Create all upper directories if necessary.
upperdirs = os.path.dirname(targetpath)
if upperdirs:
os.makedirs(upperdirs, exist_ok=True)
except Exception as e:
LoggerRoot.get_base_logger().warning("Failed creating zip directories: " + str(e))
def safe_extract(tar, path=".", members=None, numeric_owner=False): def safe_extract(tar, path=".", members=None, numeric_owner=False):
"""Tarfile member sanitization (addresses CVE-2007-4559)""" """Tarfile member sanitization (addresses CVE-2007-4559)"""
for member in tar.getmembers(): for member in tar.getmembers():

View File

@ -791,6 +791,7 @@ class Task(_Task):
argparse_args=None, # type: Optional[Sequence[Tuple[str, str]]] argparse_args=None, # type: Optional[Sequence[Tuple[str, str]]]
base_task_id=None, # type: Optional[str] base_task_id=None, # type: Optional[str]
add_task_init_call=True, # type: bool add_task_init_call=True, # type: bool
force_single_script_file=False, # type: bool
): ):
# type: (...) -> TaskInstance # type: (...) -> TaskInstance
""" """
@ -832,6 +833,7 @@ class Task(_Task):
:param base_task_id: Use a pre-existing task in the system, instead of a local repo/script. :param base_task_id: Use a pre-existing task in the system, instead of a local repo/script.
Essentially clones an existing task and overrides arguments/requirements. Essentially clones an existing task and overrides arguments/requirements.
:param add_task_init_call: If True, a 'Task.init()' call is added to the script entry point in remote execution. :param add_task_init_call: If True, a 'Task.init()' call is added to the script entry point in remote execution.
:param force_single_script_file: If True, do not auto-detect local repository
:return: The newly created Task (experiment) :return: The newly created Task (experiment)
:rtype: Task :rtype: Task
@ -852,6 +854,7 @@ class Task(_Task):
docker=docker, docker_args=docker_args, docker_bash_setup_script=docker_bash_setup_script, docker=docker, docker_args=docker_args, docker_bash_setup_script=docker_bash_setup_script,
base_task_id=base_task_id, base_task_id=base_task_id,
add_task_init_call=add_task_init_call, add_task_init_call=add_task_init_call,
force_single_script_file=force_single_script_file,
raise_on_missing_entries=False, raise_on_missing_entries=False,
) )
task = manual_populate.create_task() task = manual_populate.create_task()

View File

@ -285,11 +285,11 @@ class GPUStatCollection(object):
for nv_process in nv_comp_processes + nv_graphics_processes: for nv_process in nv_comp_processes + nv_graphics_processes:
try: try:
process = get_process_info(nv_process) process = get_process_info(nv_process)
processes.append(process)
except psutil.NoSuchProcess: except psutil.NoSuchProcess:
# TODO: add some reminder for NVML broken context # TODO: add some reminder for NVML broken context
# e.g. nvidia-smi reset or reboot the system # e.g. nvidia-smi reset or reboot the system
pass process = None
processes.append(process)
# we do not actually use these, so no point in collecting them # we do not actually use these, so no point in collecting them
# # TODO: Do not block if full process info is not requested # # TODO: Do not block if full process info is not requested
@ -313,7 +313,7 @@ class GPUStatCollection(object):
# Convert bytes into MBytes # Convert bytes into MBytes
'memory.used': memory.used // MB if memory else None, 'memory.used': memory.used // MB if memory else None,
'memory.total': memory.total // MB if memory else None, 'memory.total': memory.total // MB if memory else None,
'processes': processes, 'processes': None if (processes and all(p is None for p in processes)) else processes
} }
if per_process_stats: if per_process_stats:
GPUStatCollection.clean_processes() GPUStatCollection.clean_processes()

View File

@ -284,6 +284,11 @@ def is_std_or_local_lib(name):
False if installed package False if installed package
str if local library str if local library
""" """
# check if one of the builtin modules first
if name in sys.builtin_module_names:
return True
exist = True exist = True
if six.PY2: if six.PY2:
import imp # noqa import imp # noqa

View File

@ -287,7 +287,7 @@ class WrapperBase(type):
# (http://code.activestate.com/recipes/496741/). It adds special methods # (http://code.activestate.com/recipes/496741/). It adds special methods
# to the wrapper class so it can proxy the wrapped class. In addition, it # to the wrapper class so it can proxy the wrapped class. In addition, it
# adds a field __overrides__ in the wrapper class dictionary, containing # adds a field __overrides__ in the wrapper class dictionary, containing
# all attributes decorated to be overriden. # all attributes decorated to be overridden.
_special_names = [ _special_names = [
'__abs__', '__add__', '__and__', '__call__', '__cmp__', '__coerce__', '__abs__', '__add__', '__and__', '__call__', '__cmp__', '__coerce__',

View File

@ -43,6 +43,8 @@ class ResourceMonitor(BackgroundMonitor):
self._process_info = psutil.Process() if report_mem_used_per_process else None self._process_info = psutil.Process() if report_mem_used_per_process else None
self._last_process_pool = {} self._last_process_pool = {}
self._last_process_id_list = [] self._last_process_id_list = []
self._gpu_memory_per_process = True
if not self._gpustat: if not self._gpustat:
self._task.get_logger().report_text('ClearML Monitor: GPU monitoring is not available') self._task.get_logger().report_text('ClearML Monitor: GPU monitoring is not available')
else: # if running_remotely(): else: # if running_remotely():
@ -309,27 +311,40 @@ class ResourceMonitor(BackgroundMonitor):
# On the rest of the samples we return the previous memory measurement # On the rest of the samples we return the previous memory measurement
# update mem used by our process and sub processes # update mem used by our process and sub processes
if self._process_info and (not self._last_process_pool.get('gpu') or if self._gpu_memory_per_process and self._process_info and \
(time() - self._last_process_pool['gpu'][0]) >= self._report_frequency): (not self._last_process_pool.get('gpu') or
gpu_stat = self._gpustat.new_query(per_process_stats=True) (time() - self._last_process_pool['gpu'][0]) >= self._report_frequency):
gpu_mem = {} gpu_mem = {}
# noinspection PyBroadException
try:
gpu_stat = self._gpustat.new_query(per_process_stats=True)
except Exception:
gpu_stat = self._gpustat.new_query(per_process_stats=False)
for i, g in enumerate(gpu_stat.gpus): for i, g in enumerate(gpu_stat.gpus):
# if processes is None, that means we can't query GPU memory usage per proces, so we can stop
if g.processes is None:
self._gpu_memory_per_process = False
break
# only monitor the active gpu's, if none were selected, monitor everything # only monitor the active gpu's, if none were selected, monitor everything
if self._active_gpus and i not in self._active_gpus: if self._active_gpus and i not in self._active_gpus:
continue continue
gpu_mem[i] = 0 gpu_mem[i] = 0
for p in g.processes: for p in g.processes:
if p['pid'] in self._last_process_id_list: if p is not None and p['pid'] in self._last_process_id_list:
gpu_mem[i] += p.get('gpu_memory_usage', 0) gpu_mem[i] += p.get('gpu_memory_usage', 0)
self._last_process_pool['gpu'] = time(), gpu_mem self._last_process_pool['gpu'] = time(), gpu_mem
else: else:
# if we do no need to update the memory usage, run global query # if we do no need to update the memory usage, run global query
# if we have no parent process (backward compatibility), return global stats # if we have no parent process (backward compatibility), return global stats
gpu_stat = self._gpustat.new_query() gpu_stat = self._gpustat.new_query(per_process_stats=False)
gpu_mem = self._last_process_pool['gpu'][1] if self._last_process_pool.get('gpu') else None gpu_mem = self._last_process_pool['gpu'][1] if self._last_process_pool.get('gpu') else None
# generate the statistics dict for actual report # generate the statistics dict for actual report
stats = {} stats = {}
for i, g in enumerate(gpu_stat.gpus): for i, g in enumerate(gpu_stat.gpus):
# only monitor the active gpu's, if none were selected, monitor everything # only monitor the active gpu's, if none were selected, monitor everything
if self._active_gpus and i not in self._active_gpus: if self._active_gpus and i not in self._active_gpus:
@ -367,7 +382,7 @@ class ResourceMonitor(BackgroundMonitor):
specs.update( specs.update(
gpu_count=int(len(gpus)), gpu_count=int(len(gpus)),
gpu_type=', '.join(g.name for g in gpus), gpu_type=', '.join(g.name for g in gpus),
gpu_memory=', '.join('{}GB'.format(round(g.memory_total/1024.0)) for g in gpus), gpu_memory=', '.join('{}GB'.format(round(g.memory_total / 1024.0)) for g in gpus),
gpu_driver_version=gpu_stat.driver_version or '', gpu_driver_version=gpu_stat.driver_version or '',
gpu_driver_cuda_version=gpu_stat.driver_cuda_version or '', gpu_driver_cuda_version=gpu_stat.driver_cuda_version or '',
) )

View File

@ -1 +1 @@
__version__ = '1.12.2' __version__ = '1.13.1'

View File

@ -0,0 +1,28 @@
import numpy as np
import keras
from clearml import Task
def get_model():
# Create a simple model.
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = keras.Model(inputs, outputs)
model.compile(optimizer=keras.optimizers.Adam(), loss="mean_squared_error")
return model
Task.init(project_name="examples", task_name="keras_v3")
model = get_model()
test_input = np.random.random((128, 32))
test_target = np.random.random((128, 1))
model.fit(test_input, test_target)
model.save("my_model.keras")
reconstructed_model = keras.models.load_model("my_model.keras")
np.testing.assert_allclose(
model.predict(test_input), reconstructed_model.predict(test_input)
)

View File

@ -0,0 +1,36 @@
# ClearML - example code for logging configuration files to Task":
#
import json
from pathlib import Path
import yaml
from clearml import Task
# Connecting ClearML with the current process,
# from here on everything is logged automatically
task = Task.init(project_name='FirstTrial', task_name='config_files_example')
# -----------------------------------------------
# Log config file
# Notice any file format i supported
# In the Web UI you could edit the configuration file directly as text
# and launch on a remote worker with the new configuration automatically applied
# -----------------------------------------------
config_file = task.connect_configuration(Path("data_samples") / "sample.json", name='json config file')
with open(config_file, "rt") as f:
config_json = json.load(f)
print(config_json)
config_file = task.connect_configuration(Path("data_samples") / "config_yaml.yaml", name='yaml config file')
with open(config_file, "rt") as f:
config_yaml = yaml.load(f, Loader=yaml.SafeLoader)
print(config_yaml)
print("done")

View File

@ -11,7 +11,7 @@ Pillow>=4.1.1
psutil>=3.4.2 psutil>=3.4.2
pyparsing>=2.0.3 pyparsing>=2.0.3
python-dateutil>=2.6.1 python-dateutil>=2.6.1
pyjwt>=2.4.0,<2.5.0 ; python_version > '3.5' pyjwt>=2.4.0,<2.9.0 ; python_version > '3.5'
pyjwt>=1.6.4,<2.0.0 ; python_version <= '3.5' pyjwt>=1.6.4,<2.0.0 ; python_version <= '3.5'
PyYAML>=3.12 PyYAML>=3.12
requests>=2.20.0 requests>=2.20.0