This commit is contained in:
revital 2023-08-28 14:16:29 +03:00
commit 0e8ebad30a
15 changed files with 1701 additions and 300 deletions

View File

@ -8,7 +8,7 @@
</br>Experiment Manager, MLOps and Data-Management** </br>Experiment Manager, MLOps and Data-Management**
[![GitHub license](https://img.shields.io/github/license/allegroai/clearml.svg)](https://img.shields.io/github/license/allegroai/clearml.svg) [![PyPI pyversions](https://img.shields.io/pypi/pyversions/clearml.svg)](https://img.shields.io/pypi/pyversions/clearml.svg) [![PyPI version shields.io](https://img.shields.io/pypi/v/clearml.svg)](https://pypi.org/project/clearml/) [![Conda version shields.io](https://img.shields.io/conda/v/clearml/clearml)](https://anaconda.org/clearml/clearml) [![Optuna](https://img.shields.io/badge/Optuna-integrated-blue)](https://optuna.org)<br> [![GitHub license](https://img.shields.io/github/license/allegroai/clearml.svg)](https://img.shields.io/github/license/allegroai/clearml.svg) [![PyPI pyversions](https://img.shields.io/pypi/pyversions/clearml.svg)](https://img.shields.io/pypi/pyversions/clearml.svg) [![PyPI version shields.io](https://img.shields.io/pypi/v/clearml.svg)](https://pypi.org/project/clearml/) [![Conda version shields.io](https://img.shields.io/conda/v/clearml/clearml)](https://anaconda.org/clearml/clearml) [![Optuna](https://img.shields.io/badge/Optuna-integrated-blue)](https://optuna.org)<br>
[![PyPI Downloads](https://pepy.tech/badge/clearml/month)](https://pypi.org/project/clearml/) [![Artifact Hub](https://img.shields.io/endpoint?url=https://artifacthub.io/badge/repository/allegroai)](https://artifacthub.io/packages/search?repo=allegroai) [![Youtube](https://img.shields.io/badge/ClearML-DD0000?logo=youtube&logoColor=white)](https://www.youtube.com/c/clearml) [![Slack Channel](https://img.shields.io/badge/slack-%23clearml--community-blueviolet?logo=slack)](https://joinslack.clear.ml) [![Signup](https://img.shields.io/badge/Clear%7CML-Signup-brightgreen)](https://app.clear.ml) [![PyPI Downloads](https://static.pepy.tech/badge/clearml/month)](https://pypi.org/project/clearml/) [![Artifact Hub](https://img.shields.io/endpoint?url=https://artifacthub.io/badge/repository/allegroai)](https://artifacthub.io/packages/search?repo=allegroai) [![Youtube](https://img.shields.io/badge/ClearML-DD0000?logo=youtube&logoColor=white)](https://www.youtube.com/c/clearml) [![Slack Channel](https://img.shields.io/badge/slack-%23clearml--community-blueviolet?logo=slack)](https://joinslack.clear.ml) [![Signup](https://img.shields.io/badge/Clear%7CML-Signup-brightgreen)](https://app.clear.ml)
</div> </div>
@ -16,7 +16,7 @@
### ClearML ### ClearML
<sup>*Formerly known as Allegro Trains*<sup> <sup>*Formerly known as Allegro Trains*<sup>
ClearML is a ML/DL development and production suite, it contains FIVE main modules: ClearML is a ML/DL development and production suite. It contains FIVE main modules:
- [Experiment Manager](#clearml-experiment-manager) - Automagical experiment tracking, environments and results - [Experiment Manager](#clearml-experiment-manager) - Automagical experiment tracking, environments and results
- [MLOps](https://github.com/allegroai/clearml-agent) - Orchestration, Automation & Pipelines solution for ML/DL jobs (K8s / Cloud / bare-metal) - [MLOps](https://github.com/allegroai/clearml-agent) - Orchestration, Automation & Pipelines solution for ML/DL jobs (K8s / Cloud / bare-metal)
@ -73,7 +73,7 @@ Instrumenting these components is the **ClearML-server**, see [Self-Hosting](htt
**Adding only 2 lines to your code gets you the following** **Adding only 2 lines to your code gets you the following**
* Complete experiment setup log * Complete experiment setup log
* Full source control info including non-committed local changes * Full source control info, including non-committed local changes
* Execution environment (including specific packages & versions) * Execution environment (including specific packages & versions)
* Hyper-parameters * Hyper-parameters
* [`argparse`](https://docs.python.org/3/library/argparse.html)/[Click](https://github.com/pallets/click/)/[PythonFire](https://github.com/google/python-fire) for command line parameters with currently used values * [`argparse`](https://docs.python.org/3/library/argparse.html)/[Click](https://github.com/pallets/click/)/[PythonFire](https://github.com/google/python-fire) for command line parameters with currently used values
@ -122,7 +122,7 @@ below and follow the instructions:
task = Task.init(project_name='examples', task_name='hello world') task = Task.init(project_name='examples', task_name='hello world')
``` ```
You are done, everything your process outputs is now automagically logged into ClearML. And you are done! Everything your process outputs is now automagically logged into ClearML.
Next step, automation! **Learn more about ClearML's two-click automation [here](https://clear.ml/docs/latest/docs/getting_started/mlops/mlops_first_steps)**. Next step, automation! **Learn more about ClearML's two-click automation [here](https://clear.ml/docs/latest/docs/getting_started/mlops/mlops_first_steps)**.
@ -130,9 +130,9 @@ Next step, automation! **Learn more about ClearML's two-click automation [here](
The ClearML run-time components: The ClearML run-time components:
* The ClearML Python Package for integrating ClearML into your existing scripts by adding just two lines of code, and optionally extending your experiments and other workflows with ClearML's powerful and versatile set of classes and methods. * The ClearML Python Package - for integrating ClearML into your existing scripts by adding just two lines of code, and optionally extending your experiments and other workflows with ClearML's powerful and versatile set of classes and methods.
* The ClearML Server for storing experiment, model, and workflow data, and supporting the Web UI experiment manager, and MLOps automation for reproducibility and tuning. It is available as a hosted service and open source for you to deploy your own ClearML Server. * The ClearML Server - for storing experiment, model, and workflow data; supporting the Web UI experiment manager and MLOps automation for reproducibility and tuning. It is available as a hosted service and open source for you to deploy your own ClearML Server.
* The ClearML Agent for MLOps orchestration, experiment and workflow reproducibility, and scalability. * The ClearML Agent - for MLOps orchestration, experiment and workflow reproducibility, and scalability.
<img src="https://raw.githubusercontent.com/allegroai/clearml-docs/main/docs/img/clearml_architecture.png" width="100%" alt="clearml-architecture"> <img src="https://raw.githubusercontent.com/allegroai/clearml-docs/main/docs/img/clearml_architecture.png" width="100%" alt="clearml-architecture">
@ -142,7 +142,7 @@ The ClearML run-time components:
- [clearml-task](https://github.com/allegroai/clearml/blob/master/docs/clearml-task.md) - Run any codebase on remote machines with full remote logging of Tensorboard, Matplotlib & Console outputs - [clearml-task](https://github.com/allegroai/clearml/blob/master/docs/clearml-task.md) - Run any codebase on remote machines with full remote logging of Tensorboard, Matplotlib & Console outputs
- [clearml-data](https://github.com/allegroai/clearml/blob/master/docs/datasets.md) - **CLI for managing and versioning your datasets, including creating / uploading / downloading of data from S3/GS/Azure/NAS** - [clearml-data](https://github.com/allegroai/clearml/blob/master/docs/datasets.md) - **CLI for managing and versioning your datasets, including creating / uploading / downloading of data from S3/GS/Azure/NAS**
- [AWS Auto-Scaler](https://clear.ml/docs/latest/docs/guides/services/aws_autoscaler) - Automatically spin EC2 instances based on your workloads with preconfigured budget! No need for K8s! - [AWS Auto-Scaler](https://clear.ml/docs/latest/docs/guides/services/aws_autoscaler) - Automatically spin EC2 instances based on your workloads with preconfigured budget! No need for K8s!
- [Hyper-Parameter Optimization](https://clear.ml/docs/latest/docs/guides/optimization/hyper-parameter-optimization/examples_hyperparam_opt) - Optimize any code with black-box approach and state of the art Bayesian optimization algorithms - [Hyper-Parameter Optimization](https://clear.ml/docs/latest/docs/guides/optimization/hyper-parameter-optimization/examples_hyperparam_opt) - Optimize any code with black-box approach and state-of-the-art Bayesian optimization algorithms
- [Automation Pipeline](https://clear.ml/docs/latest/docs/guides/pipeline/pipeline_controller) - Build pipelines based on existing experiments / jobs, supports building pipelines of pipelines! - [Automation Pipeline](https://clear.ml/docs/latest/docs/guides/pipeline/pipeline_controller) - Build pipelines based on existing experiments / jobs, supports building pipelines of pipelines!
- [Slack Integration](https://clear.ml/docs/latest/docs/guides/services/slack_alerts) - Report experiments progress / failure directly to Slack (fully customizable!) - [Slack Integration](https://clear.ml/docs/latest/docs/guides/services/slack_alerts) - Report experiments progress / failure directly to Slack (fully customizable!)
@ -159,11 +159,11 @@ and practices.
- Use it on a daily basis to boost collaboration and visibility in your team - Use it on a daily basis to boost collaboration and visibility in your team
- Create a remote job from any experiment with a click of a button - Create a remote job from any experiment with a click of a button
- Automate processes and create pipelines to collect your experimentation logs, outputs, and data - Automate processes and create pipelines to collect your experimentation logs, outputs, and data
- Store all you data on any object-storage solution, with the simplest interface possible - Store all your data on any object-storage solution, with the most straightforward interface possible
- Make you data transparent by cataloging it all on the ClearML platform - Make your data transparent by cataloging it all on the ClearML platform
We believe ClearML is ground-breaking. We wish to establish new standards of true seamless integration between We believe ClearML is ground-breaking. We wish to establish new standards of true seamless integration between
experiment management, MLOps and data management. experiment management, MLOps, and data management.
## Who We Are ## Who We Are
@ -172,8 +172,7 @@ ClearML is supported by you and the [clear.ml](https://clear.ml) team, which hel
We built ClearML to track and control the glorious but messy process of training production-grade deep learning models. We built ClearML to track and control the glorious but messy process of training production-grade deep learning models.
We are committed to vigorously supporting and expanding the capabilities of ClearML. We are committed to vigorously supporting and expanding the capabilities of ClearML.
We promise to always be backwardly compatible, making sure all your logs, data and pipelines We promise to always be backwardly compatible, making sure all your logs, data, and pipelines will always upgrade with you.
will always upgrade with you.
## License ## License
@ -192,7 +191,7 @@ author = {ClearML},
## Documentation, Community & Support ## Documentation, Community & Support
More information in the [official documentation](https://clear.ml/docs) and [on YouTube](https://www.youtube.com/c/ClearML). For more information, see the [official documentation](https://clear.ml/docs) and [on YouTube](https://www.youtube.com/c/ClearML).
For examples and use cases, check the [examples folder](https://github.com/allegroai/clearml/tree/master/examples) and [corresponding documentation](https://clear.ml/docs/latest/docs/guides). For examples and use cases, check the [examples folder](https://github.com/allegroai/clearml/tree/master/examples) and [corresponding documentation](https://clear.ml/docs/latest/docs/guides).

View File

@ -151,6 +151,7 @@ class PipelineController(object):
repo=None, # type: Optional[str] repo=None, # type: Optional[str]
repo_branch=None, # type: Optional[str] repo_branch=None, # type: Optional[str]
repo_commit=None, # type: Optional[str] repo_commit=None, # type: Optional[str]
always_create_from_code=True, # type: bool
artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]] artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]]
artifact_deserialization_function=None # type: Optional[Callable[[bytes], Any]] artifact_deserialization_function=None # type: Optional[Callable[[bytes], Any]]
): ):
@ -215,6 +216,9 @@ class PipelineController(object):
Use empty string ("") to disable any repository auto-detection Use empty string ("") to disable any repository auto-detection
:param repo_branch: Optional, specify the remote repository branch (Ignored, if local repo path is used) :param repo_branch: Optional, specify the remote repository branch (Ignored, if local repo path is used)
:param repo_commit: Optional, specify the repository commit ID (Ignored, if local repo path is used) :param repo_commit: Optional, specify the repository commit ID (Ignored, if local repo path is used)
:param always_create_from_code: If True (default) the pipeline is always constructed from code,
if False, pipeline is generated from pipeline configuration section on the pipeline Task itsef.
this allows to edit (also add/remove) pipeline steps without changing the original codebase
:param artifact_serialization_function: A serialization function that takes one :param artifact_serialization_function: A serialization function that takes one
parameter of any type which is the object to be serialized. The function should return parameter of any type which is the object to be serialized. The function should return
a `bytes` or `bytearray` object, which represents the serialized object. All parameter/return a `bytes` or `bytearray` object, which represents the serialized object. All parameter/return
@ -244,6 +248,7 @@ class PipelineController(object):
self._start_time = None self._start_time = None
self._pipeline_time_limit = None self._pipeline_time_limit = None
self._default_execution_queue = None self._default_execution_queue = None
self._always_create_from_code = bool(always_create_from_code)
self._version = str(version).strip() if version else None self._version = str(version).strip() if version else None
if self._version and not Version.is_valid_version_string(self._version): if self._version and not Version.is_valid_version_string(self._version):
raise ValueError( raise ValueError(
@ -785,7 +790,8 @@ class PipelineController(object):
pass pass
:param tags: A list of tags for the specific pipeline step. :param tags: A list of tags for the specific pipeline step.
When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect. When executing a Pipeline remotely
(i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
:return: True if successful :return: True if successful
""" """
@ -1412,7 +1418,7 @@ class PipelineController(object):
pipeline_object._nodes = {} pipeline_object._nodes = {}
pipeline_object._running_nodes = [] pipeline_object._running_nodes = []
try: try:
pipeline_object._deserialize(pipeline_task._get_configuration_dict(cls._config_section)) pipeline_object._deserialize(pipeline_task._get_configuration_dict(cls._config_section), force=True)
except Exception: except Exception:
pass pass
return pipeline_object return pipeline_object
@ -1431,7 +1437,8 @@ class PipelineController(object):
# type: (Union[Sequence[str], str]) -> None # type: (Union[Sequence[str], str]) -> None
""" """
Add tags to this pipeline. Old tags are not deleted. Add tags to this pipeline. Old tags are not deleted.
When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect. When executing a Pipeline remotely
(i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
:param tags: A list of tags for this pipeline. :param tags: A list of tags for this pipeline.
""" """
@ -1713,13 +1720,16 @@ class PipelineController(object):
return dag return dag
def _deserialize(self, dag_dict): def _deserialize(self, dag_dict, force=False):
# type: (dict) -> () # type: (dict, bool) -> ()
""" """
Restore the DAG from a dictionary. Restore the DAG from a dictionary.
This will be used to create the DAG from the dict stored on the Task, when running remotely. This will be used to create the DAG from the dict stored on the Task, when running remotely.
:return: :return:
""" """
# if we always want to load the pipeline DAG from code, we are skipping the deserialization step
if not force and self._always_create_from_code:
return
# if we do not clone the Task, only merge the parts we can override. # if we do not clone the Task, only merge the parts we can override.
for name in list(self._nodes.keys()): for name in list(self._nodes.keys()):
@ -2075,7 +2085,8 @@ class PipelineController(object):
pass pass
:param tags: A list of tags for the specific pipeline step. :param tags: A list of tags for the specific pipeline step.
When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect. When executing a Pipeline remotely
(i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
:return: True if successful :return: True if successful
""" """
@ -3190,7 +3201,8 @@ class PipelineController(object):
name=artifact_name, name=artifact_name,
artifact_object=artifact_object, artifact_object=artifact_object,
wait_on_upload=True, wait_on_upload=True,
extension_name=".pkl" if isinstance(artifact_object, dict) and not self._artifact_serialization_function else None, extension_name=".pkl" if isinstance(artifact_object, dict) and
not self._artifact_serialization_function else None,
serialization_function=self._artifact_serialization_function serialization_function=self._artifact_serialization_function
) )
@ -3325,6 +3337,7 @@ class PipelineDecorator(PipelineController):
repo=repo, repo=repo,
repo_branch=repo_branch, repo_branch=repo_branch,
repo_commit=repo_commit, repo_commit=repo_commit,
always_create_from_code=False,
artifact_serialization_function=artifact_serialization_function, artifact_serialization_function=artifact_serialization_function,
artifact_deserialization_function=artifact_deserialization_function artifact_deserialization_function=artifact_deserialization_function
) )
@ -3468,6 +3481,7 @@ class PipelineDecorator(PipelineController):
# visualize pipeline state (plot) # visualize pipeline state (plot)
self.update_execution_plot() self.update_execution_plot()
self._scan_monitored_nodes()
if self._stop_event: if self._stop_event:
# noinspection PyBroadException # noinspection PyBroadException
@ -3803,7 +3817,8 @@ class PipelineDecorator(PipelineController):
pass pass
:param tags: A list of tags for the specific pipeline step. :param tags: A list of tags for the specific pipeline step.
When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect. When executing a Pipeline remotely
(i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
:return: function wrapper :return: function wrapper
""" """
@ -3955,8 +3970,9 @@ class PipelineDecorator(PipelineController):
# Note that for the first iteration (when `_node.name == _node_name`) # Note that for the first iteration (when `_node.name == _node_name`)
# we always increment the name, as the name is always in `_launched_step_names` # we always increment the name, as the name is always in `_launched_step_names`
while _node.name in cls._singleton._launched_step_names or ( while _node.name in cls._singleton._launched_step_names or (
_node.name in cls._singleton._nodes _node.name in cls._singleton._nodes and
and cls._singleton._nodes[_node.name].job_code_section != cls._singleton._nodes[_node_name].job_code_section cls._singleton._nodes[_node.name].job_code_section !=
cls._singleton._nodes[_node_name].job_code_section
): ):
_node.name = "{}_{}".format(_node_name, counter) _node.name = "{}_{}".format(_node_name, counter)
counter += 1 counter += 1
@ -4303,11 +4319,6 @@ class PipelineDecorator(PipelineController):
a_pipeline._task._set_runtime_properties( a_pipeline._task._set_runtime_properties(
dict(multi_pipeline_counter=str(cls._multi_pipeline_call_counter))) dict(multi_pipeline_counter=str(cls._multi_pipeline_call_counter)))
# sync arguments back (post deserialization and casting back)
for k in pipeline_kwargs.keys():
if k in a_pipeline.get_parameters():
pipeline_kwargs[k] = a_pipeline.get_parameters()[k]
# run the actual pipeline # run the actual pipeline
if not start_controller_locally and \ if not start_controller_locally and \
not PipelineDecorator._debug_execute_step_process and pipeline_execution_queue: not PipelineDecorator._debug_execute_step_process and pipeline_execution_queue:
@ -4315,8 +4326,14 @@ class PipelineDecorator(PipelineController):
a_pipeline._task.execute_remotely(queue_name=pipeline_execution_queue) a_pipeline._task.execute_remotely(queue_name=pipeline_execution_queue)
# when we get here it means we are running remotely # when we get here it means we are running remotely
# this will also deserialize the pipeline and arguments
a_pipeline._start(wait=False) a_pipeline._start(wait=False)
# sync arguments back (post deserialization and casting back)
for k in pipeline_kwargs.keys():
if k in a_pipeline.get_parameters():
pipeline_kwargs[k] = a_pipeline.get_parameters()[k]
# this time the pipeline is executed only on the remote machine # this time the pipeline is executed only on the remote machine
try: try:
pipeline_result = func(**pipeline_kwargs) pipeline_result = func(**pipeline_kwargs)

View File

@ -384,10 +384,9 @@ class BaseJob(object):
section_overrides=None, section_overrides=None,
params_override=None, params_override=None,
configurations_override=None, configurations_override=None,
explicit_docker_image=None, explicit_docker_image=None
account_for_artifacts_hashes=True
): ):
# type: (Task, Optional[dict], Optional[dict], Optional[dict], Optional[str], bool) -> Optional[str] # type: (Task, Optional[dict], Optional[dict], Optional[dict], Optional[str]) -> Optional[str]
""" """
Create Hash (str) representing the state of the Task Create Hash (str) representing the state of the Task
@ -398,8 +397,6 @@ class BaseJob(object):
:param configurations_override: dictionary of configuration override objects (tasks.ConfigurationItem) :param configurations_override: dictionary of configuration override objects (tasks.ConfigurationItem)
:param explicit_docker_image: The explicit docker image. Used to invalidate the hash when the docker image :param explicit_docker_image: The explicit docker image. Used to invalidate the hash when the docker image
was explicitly changed was explicitly changed
:param account_for_artifacts_hashes: Calculate the hash of the task by accounting for the hashes of the
artifacts in `kwargs_artifacts` (as opposed of the task ID/artifact name stored in this section)
:return: str hash of the Task configuration :return: str hash of the Task configuration
""" """
@ -417,25 +414,25 @@ class BaseJob(object):
return None return None
# we need to ignore `requirements` section because ir might be changing from run to run # we need to ignore `requirements` section because ir might be changing from run to run
script = deepcopy(script)
script.pop("requirements", None) script.pop("requirements", None)
hyper_params = deepcopy(task.get_parameters() if params_override is None else params_override) hyper_params = deepcopy(task.get_parameters() if params_override is None else params_override)
if account_for_artifacts_hashes: hyper_params_to_change = {}
hyper_params_to_change = {} task_cache = {}
task_cache = {} for key, value in hyper_params.items():
for key, value in hyper_params.items(): if key.startswith("kwargs_artifacts/"):
if key.startswith("kwargs_artifacts/"): # noinspection PyBroadException
# noinspection PyBroadException try:
try: # key format is <task_id>.<artifact_name>
# key format is <task_id>.<artifact_name> task_id, artifact = value.split(".", 1)
task_id, artifact = value.split(".", 1) task_ = task_cache.setdefault(task_id, Task.get_task(task_id))
task_ = task_cache.setdefault(task_id, Task.get_task(task_id)) # set the value of the hyper parameter to the hash of the artifact
# set the value of the hyper parameter to the hash of the artifact # because the task ID might differ, but the artifact might be the same
# because the task ID might differ, but the artifact might be the same hyper_params_to_change[key] = task_.artifacts[artifact].hash
hyper_params_to_change[key] = task_.artifacts[artifact].hash except Exception:
except Exception: pass
pass hyper_params.update(hyper_params_to_change)
hyper_params.update(hyper_params_to_change)
configs = task.get_configuration_objects() if configurations_override is None else configurations_override configs = task.get_configuration_objects() if configurations_override is None else configurations_override
# currently we do not add the docker image to the hash (only args and setup script), # currently we do not add the docker image to the hash (only args and setup script),
# because default docker image will cause the step to change # because default docker image will cause the step to change
@ -604,14 +601,6 @@ class ClearmlJob(BaseJob):
if allow_caching: if allow_caching:
# look for a cached copy of the Task # look for a cached copy of the Task
# get parameters + task_overrides + as dict and hash it. # get parameters + task_overrides + as dict and hash it.
task_hash_legacy = self._create_task_hash(
base_temp_task,
section_overrides=sections,
params_override=task_params,
configurations_override=configuration_overrides or None,
explicit_docker_image=kwargs.get("explicit_docker_image"),
account_for_artifacts_hashes=False
)
task_hash = self._create_task_hash( task_hash = self._create_task_hash(
base_temp_task, base_temp_task,
section_overrides=sections, section_overrides=sections,
@ -619,7 +608,7 @@ class ClearmlJob(BaseJob):
configurations_override=configuration_overrides or None, configurations_override=configuration_overrides or None,
explicit_docker_image=kwargs.get("explicit_docker_image") explicit_docker_image=kwargs.get("explicit_docker_image")
) )
task = self._get_cached_task(task_hash_legacy) or self._get_cached_task(task_hash) task = self._get_cached_task(task_hash)
# if we found a task, just use # if we found a task, just use
if task: if task:
if disable_clone_task and self.task and self.task.status == self.task.TaskStatusEnum.created: if disable_clone_task and self.task and self.task.status == self.task.TaskStatusEnum.created:

View File

@ -273,8 +273,8 @@ class ScriptRequirements(object):
class _JupyterObserver(object): class _JupyterObserver(object):
_thread = None _thread = None
_exit_event = SafeEvent() _exit_event = None
_sync_event = SafeEvent() _sync_event = None
_sample_frequency = 30. _sample_frequency = 30.
_first_sample_frequency = 3. _first_sample_frequency = 3.
_jupyter_history_logger = None _jupyter_history_logger = None
@ -286,6 +286,10 @@ class _JupyterObserver(object):
@classmethod @classmethod
def observer(cls, jupyter_notebook_filename, notebook_name=None, log_history=False): def observer(cls, jupyter_notebook_filename, notebook_name=None, log_history=False):
if cls._exit_event is None:
cls._exit_event = SafeEvent()
if cls._sync_event is None:
cls._sync_event = SafeEvent()
if cls._thread is not None: if cls._thread is not None:
# order of signaling is important! # order of signaling is important!
cls._exit_event.set() cls._exit_event.set()
@ -304,6 +308,8 @@ class _JupyterObserver(object):
@classmethod @classmethod
def signal_sync(cls, *_, **__): def signal_sync(cls, *_, **__):
if cls._sync_event is None:
return
cls._sync_event.set() cls._sync_event.set()
@classmethod @classmethod

View File

@ -1,3 +1,5 @@
import gzip
import io
import json import json
import yaml import yaml
import mimetypes import mimetypes
@ -38,7 +40,7 @@ try:
except ImportError: except ImportError:
np = None np = None
try: try:
from pathlib import Path as pathlib_Path from pathlib import Path as pathlib_Path # noqa
except ImportError: except ImportError:
pathlib_Path = None pathlib_Path = None
@ -321,6 +323,7 @@ class Artifacts(object):
self._storage_prefix = None self._storage_prefix = None
self._task_name = None self._task_name = None
self._project_name = None self._project_name = None
self._temp_files_lookup = {}
def register_artifact(self, name, artifact, metadata=None, uniqueness_columns=True): def register_artifact(self, name, artifact, metadata=None, uniqueness_columns=True):
# type: (str, DataFrame, Optional[dict], Union[bool, Sequence[str]]) -> () # type: (str, DataFrame, Optional[dict], Union[bool, Sequence[str]]) -> ()
@ -428,15 +431,15 @@ class Artifacts(object):
artifact_type_data.preview = "" artifact_type_data.preview = ""
override_filename_ext_in_uri = extension_name or "" override_filename_ext_in_uri = extension_name or ""
override_filename_in_uri = name + override_filename_ext_in_uri override_filename_in_uri = name + override_filename_ext_in_uri
fd, local_filename = mkstemp(prefix=quote(name, safe="") + ".", suffix=override_filename_ext_in_uri) local_filename = self._push_temp_file(
os.close(fd) prefix=quote(name, safe="") + ".", suffix=override_filename_ext_in_uri)
# noinspection PyBroadException # noinspection PyBroadException
try: try:
with open(local_filename, "wb") as f: with open(local_filename, "wb") as f:
f.write(serialization_function(artifact_object)) f.write(serialization_function(artifact_object))
except Exception: except Exception:
# cleanup and raise exception # cleanup and raise exception
os.unlink(local_filename) self._delete_temp_file(local_filename)
raise raise
artifact_type_data.content_type = mimetypes.guess_type(local_filename)[0] artifact_type_data.content_type = mimetypes.guess_type(local_filename)[0]
elif extension_name == ".pkl": elif extension_name == ".pkl":
@ -448,8 +451,8 @@ class Artifacts(object):
extension_name, [".npz", ".csv.gz"], ".npz", artifact_type extension_name, [".npz", ".csv.gz"], ".npz", artifact_type
) )
override_filename_in_uri = name + override_filename_ext_in_uri override_filename_in_uri = name + override_filename_ext_in_uri
fd, local_filename = mkstemp(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri) local_filename = self._push_temp_file(
os.close(fd) prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri)
if override_filename_ext_in_uri == ".npz": if override_filename_ext_in_uri == ".npz":
artifact_type_data.content_type = "application/numpy" artifact_type_data.content_type = "application/numpy"
np.savez_compressed(local_filename, **{name: artifact_object}) np.savez_compressed(local_filename, **{name: artifact_object})
@ -464,11 +467,10 @@ class Artifacts(object):
extension_name, [".csv.gz", ".parquet", ".feather", ".pickle"], ".csv.gz", artifact_type extension_name, [".csv.gz", ".parquet", ".feather", ".pickle"], ".csv.gz", artifact_type
) )
override_filename_in_uri = name override_filename_in_uri = name
fd, local_filename = mkstemp(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri) local_filename = self._push_temp_file(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri)
os.close(fd)
if override_filename_ext_in_uri == ".csv.gz": if override_filename_ext_in_uri == ".csv.gz":
artifact_type_data.content_type = "text/csv" artifact_type_data.content_type = "text/csv"
artifact_object.to_csv(local_filename, compression=self._compression) self._store_compressed_pd_csv(artifact_object, local_filename)
elif override_filename_ext_in_uri == ".parquet": elif override_filename_ext_in_uri == ".parquet":
try: try:
artifact_type_data.content_type = "application/parquet" artifact_type_data.content_type = "application/parquet"
@ -480,7 +482,7 @@ class Artifacts(object):
) )
) )
artifact_type_data.content_type = "text/csv" artifact_type_data.content_type = "text/csv"
artifact_object.to_csv(local_filename, compression=self._compression) self._store_compressed_pd_csv(artifact_object, local_filename)
elif override_filename_ext_in_uri == ".feather": elif override_filename_ext_in_uri == ".feather":
try: try:
artifact_type_data.content_type = "application/feather" artifact_type_data.content_type = "application/feather"
@ -492,7 +494,7 @@ class Artifacts(object):
) )
) )
artifact_type_data.content_type = "text/csv" artifact_type_data.content_type = "text/csv"
artifact_object.to_csv(local_filename, compression=self._compression) self._store_compressed_pd_csv(artifact_object, local_filename)
elif override_filename_ext_in_uri == ".pickle": elif override_filename_ext_in_uri == ".pickle":
artifact_type_data.content_type = "application/pickle" artifact_type_data.content_type = "application/pickle"
artifact_object.to_pickle(local_filename) artifact_object.to_pickle(local_filename)
@ -527,8 +529,8 @@ class Artifacts(object):
if guessed_type: if guessed_type:
artifact_type_data.content_type = guessed_type artifact_type_data.content_type = guessed_type
fd, local_filename = mkstemp(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri) local_filename = self._push_temp_file(
os.close(fd) prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri)
artifact_object.save(local_filename) artifact_object.save(local_filename)
delete_after_upload = True delete_after_upload = True
elif isinstance(artifact_object, dict): elif isinstance(artifact_object, dict):
@ -561,8 +563,9 @@ class Artifacts(object):
if serialized_text is not None: if serialized_text is not None:
override_filename_in_uri = name + override_filename_ext_in_uri override_filename_in_uri = name + override_filename_ext_in_uri
fd, local_filename = mkstemp(prefix=quote(name, safe="") + ".", suffix=override_filename_ext_in_uri) local_filename = self._push_temp_file(
with open(fd, "w") as f: prefix=quote(name, safe="") + ".", suffix=override_filename_ext_in_uri)
with open(local_filename, "w") as f:
f.write(serialized_text) f.write(serialized_text)
preview = preview or serialized_text preview = preview or serialized_text
if len(preview) < self.max_preview_size_bytes: if len(preview) < self.max_preview_size_bytes:
@ -599,7 +602,7 @@ class Artifacts(object):
files = list(Path(folder).rglob(wildcard)) files = list(Path(folder).rglob(wildcard))
override_filename_ext_in_uri = '.zip' override_filename_ext_in_uri = '.zip'
override_filename_in_uri = folder.parts[-1] + override_filename_ext_in_uri override_filename_in_uri = folder.parts[-1] + override_filename_ext_in_uri
fd, zip_file = mkstemp( zip_file = self._push_temp_file(
prefix=quote(folder.parts[-1], safe="") + '.', suffix=override_filename_ext_in_uri prefix=quote(folder.parts[-1], safe="") + '.', suffix=override_filename_ext_in_uri
) )
try: try:
@ -618,8 +621,7 @@ class Artifacts(object):
LoggerRoot.get_base_logger().warning('Exception {}\nFailed zipping artifact folder {}'.format( LoggerRoot.get_base_logger().warning('Exception {}\nFailed zipping artifact folder {}'.format(
folder, e)) folder, e))
return False return False
finally:
os.close(fd)
artifact_type_data.preview = preview or archive_preview artifact_type_data.preview = preview or archive_preview
artifact_object = zip_file artifact_object = zip_file
artifact_type = 'archive' artifact_type = 'archive'
@ -647,7 +649,7 @@ class Artifacts(object):
override_filename_ext_in_uri = '.zip' override_filename_ext_in_uri = '.zip'
override_filename_in_uri = quote(name, safe="") + override_filename_ext_in_uri override_filename_in_uri = quote(name, safe="") + override_filename_ext_in_uri
common_path = get_common_path(list_files) common_path = get_common_path(list_files)
fd, zip_file = mkstemp( zip_file = self._push_temp_file(
prefix='artifact_folder.', suffix=override_filename_ext_in_uri prefix='artifact_folder.', suffix=override_filename_ext_in_uri
) )
try: try:
@ -670,8 +672,7 @@ class Artifacts(object):
LoggerRoot.get_base_logger().warning('Exception {}\nFailed zipping artifact files {}'.format( LoggerRoot.get_base_logger().warning('Exception {}\nFailed zipping artifact files {}'.format(
artifact_object, e)) artifact_object, e))
return False return False
finally:
os.close(fd)
artifact_type_data.preview = preview or archive_preview artifact_type_data.preview = preview or archive_preview
artifact_object = zip_file artifact_object = zip_file
artifact_type = 'archive' artifact_type = 'archive'
@ -704,15 +705,15 @@ class Artifacts(object):
delete_after_upload = True delete_after_upload = True
override_filename_ext_in_uri = ".txt" override_filename_ext_in_uri = ".txt"
override_filename_in_uri = name + override_filename_ext_in_uri override_filename_in_uri = name + override_filename_ext_in_uri
fd, local_filename = mkstemp(prefix=quote(name, safe="") + ".", suffix=override_filename_ext_in_uri) local_filename = self._push_temp_file(
os.close(fd) prefix=quote(name, safe="") + ".", suffix=override_filename_ext_in_uri)
# noinspection PyBroadException # noinspection PyBroadException
try: try:
with open(local_filename, "wt") as f: with open(local_filename, "wt") as f:
f.write(artifact_object) f.write(artifact_object)
except Exception: except Exception:
# cleanup and raise exception # cleanup and raise exception
os.unlink(local_filename) self._delete_temp_file(local_filename)
raise raise
elif artifact_object is None or (isinstance(artifact_object, str) and artifact_object == ""): elif artifact_object is None or (isinstance(artifact_object, str) and artifact_object == ""):
artifact_type = '' artifact_type = ''
@ -736,15 +737,15 @@ class Artifacts(object):
delete_after_upload = True delete_after_upload = True
override_filename_ext_in_uri = '.pkl' override_filename_ext_in_uri = '.pkl'
override_filename_in_uri = name + override_filename_ext_in_uri override_filename_in_uri = name + override_filename_ext_in_uri
fd, local_filename = mkstemp(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri) local_filename = self._push_temp_file(
os.close(fd) prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri)
# noinspection PyBroadException # noinspection PyBroadException
try: try:
with open(local_filename, 'wb') as f: with open(local_filename, 'wb') as f:
pickle.dump(artifact_object, f) pickle.dump(artifact_object, f)
except Exception: except Exception:
# cleanup and raise exception # cleanup and raise exception
os.unlink(local_filename) self._delete_temp_file(local_filename)
raise raise
# verify preview not out of scope: # verify preview not out of scope:
@ -875,10 +876,10 @@ class Artifacts(object):
override_filename_ext_in_uri = self._save_format override_filename_ext_in_uri = self._save_format
override_filename_in_uri = name override_filename_in_uri = name
fd, local_csv = mkstemp(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri) local_csv = self._push_temp_file(
os.close(fd) prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri)
local_csv = Path(local_csv) local_csv = Path(local_csv)
pd_artifact.to_csv(local_csv.as_posix(), index=False, compression=self._compression) self._store_compressed_pd_csv(pd_artifact, local_csv.as_posix(), index=False)
current_sha2, file_sha2 = sha256sum( current_sha2, file_sha2 = sha256sum(
local_csv.as_posix(), skip_header=32, block_size=Artifacts._hash_block_size) local_csv.as_posix(), skip_header=32, block_size=Artifacts._hash_block_size)
if name in self._last_artifacts_upload: if name in self._last_artifacts_upload:
@ -887,7 +888,7 @@ class Artifacts(object):
# nothing to do, we can skip the upload # nothing to do, we can skip the upload
# noinspection PyBroadException # noinspection PyBroadException
try: try:
local_csv.unlink() self._delete_temp_file(local_csv)
except Exception: except Exception:
pass pass
return return
@ -944,6 +945,8 @@ class Artifacts(object):
""" """
from clearml.storage import StorageManager from clearml.storage import StorageManager
local_file = self._pop_temp_file(local_file)
upload_uri = self._task.output_uri or self._task.get_logger().get_default_upload_destination() upload_uri = self._task.output_uri or self._task.get_logger().get_default_upload_destination()
if not isinstance(local_file, Path): if not isinstance(local_file, Path):
local_file = Path(local_file) local_file = Path(local_file)
@ -962,7 +965,7 @@ class Artifacts(object):
StorageManager.upload_file(local_file.as_posix(), uri, wait_for_upload=True, retries=ev.retries) StorageManager.upload_file(local_file.as_posix(), uri, wait_for_upload=True, retries=ev.retries)
if delete_after_upload: if delete_after_upload:
try: try:
os.unlink(local_file.as_posix()) self._delete_temp_file(local_file)
except OSError: except OSError:
LoggerRoot.get_base_logger().warning('Failed removing temporary {}'.format(local_file)) LoggerRoot.get_base_logger().warning('Failed removing temporary {}'.format(local_file))
else: else:
@ -1047,9 +1050,84 @@ class Artifacts(object):
def _get_storage_uri_prefix(self): def _get_storage_uri_prefix(self):
# type: () -> str # type: () -> str
if not self._storage_prefix or self._task_name != self._task.name or self._project_name != self._task.get_project_name(): if not self._storage_prefix or self._task_name != self._task.name or \
self._project_name != self._task.get_project_name():
# noinspection PyProtectedMember # noinspection PyProtectedMember
self._storage_prefix = self._task._get_output_destination_suffix() self._storage_prefix = self._task._get_output_destination_suffix()
self._task_name = self._task.name self._task_name = self._task.name
self._project_name = self._task.get_project_name() self._project_name = self._task.get_project_name()
return self._storage_prefix return self._storage_prefix
def _store_compressed_pd_csv(self, artifact_object, local_filename, **kwargs):
# bugfix: to make pandas csv.gz consistent file hash we must pass mtime=0
# (otherwise it is encoded and creates new hash every time)
if self._compression == "gzip":
with gzip.GzipFile(local_filename, 'wb', mtime=0) as gzip_file:
try:
pd_version = int(pd.__version__.split(".")[0])
except ValueError:
pd_version = 0
if pd_version >= 2:
artifact_object.to_csv(gzip_file, **kwargs)
else:
# old (pandas<2) versions of pandas cannot handle direct gzip stream, so we manually encode it
artifact_object.to_csv(io.TextIOWrapper(gzip_file), **kwargs)
else:
artifact_object.to_csv(local_filename, compression=self._compression)
def _push_temp_file(self, prefix=None, suffix=None):
"""
Same prefix/suffix as mkstemp uses
:param prefix: Same prefix/suffix as mkstemp uses
:param suffix: Same prefix/suffix as mkstemp uses
:return: consistent temp file inside a single folder that later we rename to a temp file
"""
# we want to make sure our temp naming convention is consistent
# this is important for hashing zip files and gz files, because the name of the internal
# file becomes part of the content and then hash changes
# temp filename is based on the assumption
# put a consistent the file into a temp folder because the filename is part of
# the compressed artifact and we want consistency. After that we rename compressed file to temp file and
# delete temp folder
temp_folder = mkdtemp(prefix='artifacts_')
local_filename = Path(temp_folder) / (str(prefix).rstrip(".") + "." + str(suffix).lstrip("."))
local_filename = local_filename.as_posix()
self._temp_files_lookup[local_filename] = (temp_folder, deepcopy(prefix), deepcopy(suffix))
return local_filename
def _pop_temp_file(self, local_filename=None):
"""
Now we need to move the consistent file from the temp folder to the main temp folder,
give it a new temp name, and remove the temp folder
:param local_filename: local file name inside a temp folder, assumed to be a single file in the temp folder
:return: new temp file inside the main temp folder
"""
# convert to posix if Path
if isinstance(local_filename, Path):
local_filename = local_filename.as_posix()
# if this is not our temp file, just do nothing
if local_filename not in self._temp_files_lookup:
return local_filename
# move file out of temp folder
try:
temp_folder, prefix, suffix = self._temp_files_lookup.pop(local_filename)
fd, temp_filename = mkstemp(prefix=prefix, suffix=suffix)
os.close(fd)
os.replace(local_filename, temp_filename)
local_filename = temp_filename
os.rmdir(temp_folder)
except Exception as ex:
raise ValueError("Failed storing temp artifact into {}: error: {}".format(local_filename, ex))
return temp_filename
def _delete_temp_file(self, local_filename):
# cleanup and raise exception
local_filename = self._pop_temp_file(local_filename)
os.unlink(local_filename)

View File

@ -89,6 +89,7 @@ class SimpleQueueWrapper(object):
class PatchOsFork(object): class PatchOsFork(object):
_original_fork = None _original_fork = None
_registered_fork_callbacks = False
_current_task = None _current_task = None
_original_process_run = None _original_process_run = None
@ -104,13 +105,20 @@ class PatchOsFork(object):
# noinspection PyBroadException # noinspection PyBroadException
try: try:
# only once # only once
if cls._original_fork: if cls._registered_fork_callbacks or cls._original_fork:
return return
if six.PY2: try:
cls._original_fork = staticmethod(os.fork) os.register_at_fork(before=PatchOsFork._fork_callback_before,
else: after_in_child=PatchOsFork._fork_callback_after_child)
cls._original_fork = os.fork cls._registered_fork_callbacks = True
os.fork = cls._patched_fork except Exception:
# python <3.6
if six.PY2:
cls._original_fork = staticmethod(os.fork)
else:
cls._original_fork = os.fork
os.fork = cls._patched_fork
except Exception: except Exception:
pass pass
@ -182,10 +190,9 @@ class PatchOsFork(object):
pass pass
@staticmethod @staticmethod
def _patched_fork(*args, **kwargs): def _fork_callback_before():
if not PatchOsFork._current_task: if not PatchOsFork._current_task:
return PatchOsFork._original_fork(*args, **kwargs) return
from ..task import Task from ..task import Task
# ensure deferred is done, but never try to generate a Task object # ensure deferred is done, but never try to generate a Task object
@ -195,46 +202,63 @@ class PatchOsFork(object):
# noinspection PyProtectedMember # noinspection PyProtectedMember
Task._wait_for_deferred(task) Task._wait_for_deferred(task)
@staticmethod
def _fork_callback_after_child():
if not PatchOsFork._current_task:
return
from ..task import Task
# force creating a Task
task = Task.current_task()
if not task:
return
PatchOsFork._current_task = task
# # Hack: now make sure we setup the reporter threads (Log+Reporter)
# noinspection PyProtectedMember
if not bool(task._report_subprocess_enabled):
BackgroundMonitor.start_all(task=task)
# The signal handler method is Not enough, for the time being, we have both
# even though it makes little sense
# # if we got here patch the os._exit of our instance to call us
def _at_exit_callback(*a_args, **a_kwargs):
# just make sure we flush the internal state (the at exist caught by the external signal does the rest
# in theory we should not have to do any of that, but for some reason if we do not
# the signal is never caught by the signal call backs, not sure why....
sleep(0.1)
# Since at_exist handlers do not work on forked processes, we have to manually call them here
if task:
try:
# not to worry there is a double _at_exit protection implemented inside task._at_exit()
# noinspection PyProtectedMember
task._at_exit()
except: # noqa
pass
# noinspection PyProtectedMember, PyUnresolvedReferences
return os._org_exit(*a_args, **a_kwargs)
if not hasattr(os, '_org_exit'):
# noinspection PyProtectedMember, PyUnresolvedReferences
os._org_exit = os._exit
os._exit = _at_exit_callback
@staticmethod
def _patched_fork(*args, **kwargs):
if not PatchOsFork._current_task:
return PatchOsFork._original_fork(*args, **kwargs)
PatchOsFork._fork_callback_before()
ret = PatchOsFork._original_fork(*args, **kwargs) ret = PatchOsFork._original_fork(*args, **kwargs)
if not PatchOsFork._current_task: if not PatchOsFork._current_task:
return ret return ret
# Make sure the new process stdout is logged # Make sure the new process stdout is logged
if not ret: if not ret:
# force creating a Task PatchOsFork._fork_callback_after_child()
task = Task.current_task()
if not task:
return ret
PatchOsFork._current_task = task
# # Hack: now make sure we setup the reporter threads (Log+Reporter)
# noinspection PyProtectedMember
if not bool(task._report_subprocess_enabled):
BackgroundMonitor.start_all(task=task)
# The signal handler method is Not enough, for the time being, we have both
# even though it makes little sense
# # if we got here patch the os._exit of our instance to call us
def _at_exit_callback(*a_args, **a_kwargs):
# just make sure we flush the internal state (the at exist caught by the external signal does the rest
# in theory we should not have to do any of that, but for some reason if we do not
# the signal is never caught by the signal call backs, not sure why....
sleep(0.1)
# Since at_exist handlers do not work on forked processes, we have to manually call them here
if task:
try:
# not to worry there is a double _at_exit protection implemented inside task._at_exit()
# noinspection PyProtectedMember
task._at_exit()
except: # noqa
pass
# noinspection PyProtectedMember, PyUnresolvedReferences
return os._org_exit(*a_args, **a_kwargs)
if not hasattr(os, '_org_exit'):
# noinspection PyProtectedMember, PyUnresolvedReferences
os._org_exit = os._exit
os._exit = _at_exit_callback
return ret return ret

View File

@ -8,6 +8,7 @@ from . import _patched_call
from .tensorflow_bind import WeightsGradientHistHelper from .tensorflow_bind import WeightsGradientHistHelper
from ..import_bind import PostImportHookPatching from ..import_bind import PostImportHookPatching
from ...debugging.log import LoggerRoot from ...debugging.log import LoggerRoot
from .tensorflow_bind import IsTensorboardInit
try: try:
import fastai import fastai
@ -51,7 +52,7 @@ class PatchFastaiV1(object):
@staticmethod @staticmethod
def patch_model_callback(): def patch_model_callback():
# if you have tensorboard, we assume you use TensorboardLogger, which we catch, so no need to patch. # if you have tensorboard, we assume you use TensorboardLogger, which we catch, so no need to patch.
if "tensorboard" in sys.modules: if "tensorboard" in sys.modules and IsTensorboardInit.tensorboard_used():
return return
try: try:
@ -191,7 +192,7 @@ class PatchFastaiV2(object):
@staticmethod @staticmethod
def patch_model_callback(): def patch_model_callback():
if "tensorboard" in sys.modules: if "tensorboard" in sys.modules and IsTensorboardInit.tensorboard_used():
return return
# noinspection PyBroadException # noinspection PyBroadException

View File

@ -1,7 +1,7 @@
import io import io
import sys import sys
from functools import partial from functools import partial
import yaml
from ..config import running_remotely, get_remote_task_id, DEV_TASK_NO_REUSE from ..config import running_remotely, get_remote_task_id, DEV_TASK_NO_REUSE
from ..debugging.log import LoggerRoot from ..debugging.log import LoggerRoot
@ -81,7 +81,14 @@ class PatchHydra(object):
stored_config = {k[len(PatchHydra._parameter_section)+1:]: v for k, v in full_parameters.items() stored_config = {k[len(PatchHydra._parameter_section)+1:]: v for k, v in full_parameters.items()
if k.startswith(PatchHydra._parameter_section+'/')} if k.startswith(PatchHydra._parameter_section+'/')}
stored_config.pop(PatchHydra._parameter_allow_full_edit, None) stored_config.pop(PatchHydra._parameter_allow_full_edit, None)
overrides = ['{}={}'.format(k, v) for k, v in stored_config.items()] # noinspection PyBroadException
try:
overrides = yaml.safe_load(full_parameters.get("Args/overrides", "")) or []
except Exception:
overrides = []
if overrides and not isinstance(overrides, (list, tuple)):
overrides = [overrides]
overrides += ['{}={}'.format(k, v) for k, v in stored_config.items()]
else: else:
# We take care of it inside the _patched_run_job # We take care of it inside the _patched_run_job
pass pass
@ -119,7 +126,8 @@ class PatchHydra(object):
else: else:
PatchHydra._last_untracked_state['connect'] = dict( PatchHydra._last_untracked_state['connect'] = dict(
mutable=stored_config, name=PatchHydra._parameter_section) mutable=stored_config, name=PatchHydra._parameter_section)
# todo: remove the overrides section from the Args (we have it here) # Maybe ?! remove the overrides section from the Args (we have it here)
# But when used with a Pipeline this is the only section we get... so we leave it here anyhow
# PatchHydra._current_task.delete_parameter('Args/overrides') # PatchHydra._current_task.delete_parameter('Args/overrides')
except Exception: except Exception:
pass pass

View File

@ -159,6 +159,8 @@ class Dataset(object):
LoggerRoot.get_base_logger().warning( LoggerRoot.get_base_logger().warning(
"Setting non-semantic dataset version '{}'".format(self._dataset_version) "Setting non-semantic dataset version '{}'".format(self._dataset_version)
) )
if dataset_name == "":
raise ValueError("`dataset_name` cannot be an empty string")
if task: if task:
self._task_pinger = None self._task_pinger = None
self._created_task = False self._created_task = False
@ -1884,7 +1886,8 @@ class Dataset(object):
Query list of dataset in the system Query list of dataset in the system
:param dataset_project: Specify dataset project name :param dataset_project: Specify dataset project name
:param partial_name: Specify partial match to a dataset name :param partial_name: Specify partial match to a dataset name. This method supports regular expressions for name
matching (if you wish to match special characters and avoid any regex behaviour, use re.escape())
:param tags: Specify user tags :param tags: Specify user tags
:param ids: List specific dataset based on IDs list :param ids: List specific dataset based on IDs list
:param only_completed: If False, return datasets that are still in progress (uploading/edited etc.) :param only_completed: If False, return datasets that are still in progress (uploading/edited etc.)

View File

@ -357,6 +357,9 @@ class BaseModel(object):
self._task = None self._task = None
self._reload_required = False self._reload_required = False
self._reporter = None self._reporter = None
self._floating_data = None
self._name = None
self._task_connect_name = None
self._set_task(task) self._set_task(task)
def get_weights(self, raise_on_error=False, force_download=False): def get_weights(self, raise_on_error=False, force_download=False):
@ -1055,6 +1058,7 @@ class BaseModel(object):
def _init_reporter(self): def _init_reporter(self):
if self._reporter: if self._reporter:
return return
self._base_model = self._get_force_base_model()
metrics_manager = Metrics( metrics_manager = Metrics(
session=_Model._get_default_session(), session=_Model._get_default_session(),
storage_uri=None, storage_uri=None,
@ -1126,6 +1130,8 @@ class BaseModel(object):
:return: True if the metadata was set and False otherwise :return: True if the metadata was set and False otherwise
""" """
if not self._base_model:
self._base_model = self._get_force_base_model()
self._reload_required = ( self._reload_required = (
_Model._get_default_session() _Model._get_default_session()
.send( .send(
@ -1167,6 +1173,8 @@ class BaseModel(object):
:return: String representation of the value of the metadata entry or None if the entry was not found :return: String representation of the value of the metadata entry or None if the entry was not found
""" """
if not self._base_model:
self._base_model = self._get_force_base_model()
self._reload_if_required() self._reload_if_required()
return self.get_all_metadata().get(str(key), {}).get("value") return self.get_all_metadata().get(str(key), {}).get("value")
@ -1180,6 +1188,8 @@ class BaseModel(object):
:return: The value of the metadata entry, casted to its type (if not possible, :return: The value of the metadata entry, casted to its type (if not possible,
the string representation will be returned) or None if the entry was not found the string representation will be returned) or None if the entry was not found
""" """
if not self._base_model:
self._base_model = self._get_force_base_model()
key = str(key) key = str(key)
metadata = self.get_all_metadata() metadata = self.get_all_metadata()
if key not in metadata: if key not in metadata:
@ -1194,6 +1204,8 @@ class BaseModel(object):
:return: Get all metadata as a dictionary of format Dict[key, Dict[value, type]]. The key, value and type :return: Get all metadata as a dictionary of format Dict[key, Dict[value, type]]. The key, value and type
entries are all strings. Note that each entry might have an additional 'key' entry, repeating the key entries are all strings. Note that each entry might have an additional 'key' entry, repeating the key
""" """
if not self._base_model:
self._base_model = self._get_force_base_model()
self._reload_if_required() self._reload_if_required()
return self._get_model_data().metadata or {} return self._get_model_data().metadata or {}
@ -1204,6 +1216,8 @@ class BaseModel(object):
entries are strings. The value is cast to its type if possible. Note that each entry might entries are strings. The value is cast to its type if possible. Note that each entry might
have an additional 'key' entry, repeating the key have an additional 'key' entry, repeating the key
""" """
if not self._base_model:
self._base_model = self._get_force_base_model()
self._reload_if_required() self._reload_if_required()
result = {} result = {}
metadata = self.get_all_metadata() metadata = self.get_all_metadata()
@ -1224,6 +1238,8 @@ class BaseModel(object):
:return: True if the metadata was set and False otherwise :return: True if the metadata was set and False otherwise
""" """
if not self._base_model:
self._base_model = self._get_force_base_model()
metadata_array = [ metadata_array = [
{ {
"key": str(k), "key": str(k),
@ -1249,6 +1265,74 @@ class BaseModel(object):
self._get_base_model().reload() self._get_base_model().reload()
self._reload_required = False self._reload_required = False
def _update_base_model(self, model_name=None, task_model_entry=None):
if not self._task:
return self._base_model
# update the model from the task inputs
labels = self._task.get_labels_enumeration()
# noinspection PyProtectedMember
config_text = self._task._get_model_config_text()
model_name = (
model_name or self._name or (self._floating_data.name if self._floating_data else None) or self._task.name
)
# noinspection PyBroadException
try:
task_model_entry = (
task_model_entry
or self._task_connect_name
or Path(self._get_model_data().uri).stem
)
except Exception:
pass
parent = self._task.input_models_id.get(task_model_entry)
self._base_model.update(
labels=(self._floating_data.labels if self._floating_data else None) or labels,
design=(self._floating_data.design if self._floating_data else None) or config_text,
task_id=self._task.id,
project_id=self._task.project,
parent_id=parent,
name=model_name,
comment=self._floating_data.comment if self._floating_data else None,
tags=self._floating_data.tags if self._floating_data else None,
framework=self._floating_data.framework if self._floating_data else None,
upload_storage_uri=self._floating_data.upload_storage_uri if self._floating_data else None,
)
# remove model floating change set, by now they should have matched the task.
self._floating_data = None
# now we have to update the creator task so it points to us
if str(self._task.status) not in (
str(self._task.TaskStatusEnum.created),
str(self._task.TaskStatusEnum.in_progress),
):
self._log.warning(
"Could not update last created model in Task {}, "
"Task status '{}' cannot be updated".format(
self._task.id, self._task.status
)
)
elif task_model_entry:
self._base_model.update_for_task(
task_id=self._task.id,
model_id=self.id,
type_="output",
name=task_model_entry,
)
return self._base_model
def _get_force_base_model(self, model_name=None, task_model_entry=None):
if self._base_model:
return self._base_model
if not self._task:
return None
# create a new model from the task
# noinspection PyProtectedMember
self._base_model = self._task._get_output_model(model_id=None)
return self._update_base_model(model_name=model_name, task_model_entry=task_model_entry)
class Model(BaseModel): class Model(BaseModel):
""" """
@ -2060,6 +2144,7 @@ class OutputModel(BaseModel):
self._base_model = None self._base_model = None
self._base_model_id = None self._base_model_id = None
self._task_connect_name = None self._task_connect_name = None
self._name = name
self._label_enumeration = label_enumeration self._label_enumeration = label_enumeration
# noinspection PyProtectedMember # noinspection PyProtectedMember
self._floating_data = create_dummy_model( self._floating_data = create_dummy_model(
@ -2300,7 +2385,11 @@ class OutputModel(BaseModel):
if out_model_file_name if out_model_file_name
else (self._task_connect_name or "Output Model") else (self._task_connect_name or "Output Model")
) )
model = self._get_force_base_model(task_model_entry=name) if not self._base_model:
model = self._get_force_base_model(task_model_entry=name)
else:
self._update_base_model(task_model_entry=name)
model = self._base_model
if not model: if not model:
raise ValueError("Failed creating internal output model") raise ValueError("Failed creating internal output model")
@ -2639,61 +2728,6 @@ class OutputModel(BaseModel):
) )
return weights_filename_offline or register_uri return weights_filename_offline or register_uri
def _get_force_base_model(self, model_name=None, task_model_entry=None):
if self._base_model:
return self._base_model
# create a new model from the task
# noinspection PyProtectedMember
self._base_model = self._task._get_output_model(model_id=None)
# update the model from the task inputs
labels = self._task.get_labels_enumeration()
# noinspection PyProtectedMember
config_text = self._task._get_model_config_text()
model_name = model_name or self._floating_data.name or self._task.name
task_model_entry = (
task_model_entry
or self._task_connect_name
or Path(self._get_model_data().uri).stem
)
parent = self._task.input_models_id.get(task_model_entry)
self._base_model.update(
labels=self._floating_data.labels or labels,
design=self._floating_data.design or config_text,
task_id=self._task.id,
project_id=self._task.project,
parent_id=parent,
name=model_name,
comment=self._floating_data.comment,
tags=self._floating_data.tags,
framework=self._floating_data.framework,
upload_storage_uri=self._floating_data.upload_storage_uri,
)
# remove model floating change set, by now they should have matched the task.
self._floating_data = None
# now we have to update the creator task so it points to us
if str(self._task.status) not in (
str(self._task.TaskStatusEnum.created),
str(self._task.TaskStatusEnum.in_progress),
):
self._log.warning(
"Could not update last created model in Task {}, "
"Task status '{}' cannot be updated".format(
self._task.id, self._task.status
)
)
else:
self._base_model.update_for_task(
task_id=self._task.id,
model_id=self.id,
type_="output",
name=task_model_entry,
)
return self._base_model
def _get_base_model(self): def _get_base_model(self):
if self._floating_data: if self._floating_data:
return self._floating_data return self._floating_data

View File

@ -504,12 +504,16 @@ class _Boto3Driver(_Driver):
'ContentType': get_file_mimetype(object_name) 'ContentType': get_file_mimetype(object_name)
} }
extra_args.update(container.config.extra_args or {}) extra_args.update(container.config.extra_args or {})
container.bucket.upload_fileobj(stream, object_name, Config=boto3.s3.transfer.TransferConfig( container.bucket.upload_fileobj(
use_threads=container.config.multipart, stream,
max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1, object_name,
num_download_attempts=container.config.retries, Config=boto3.s3.transfer.TransferConfig(
multipart_threshold=self._multipart_threshold, use_threads=container.config.multipart,
multipart_chunksize=self._multipart_chunksize), max_concurrency=int(self._max_multipart_concurrency) if container.config.multipart else 1,
num_download_attempts=container.config.retries,
multipart_threshold=int(self._multipart_threshold),
multipart_chunksize=int(self._multipart_chunksize),
),
Callback=callback, Callback=callback,
ExtraArgs=extra_args, ExtraArgs=extra_args,
) )
@ -523,8 +527,8 @@ class _Boto3Driver(_Driver):
Config=boto3.s3.transfer.TransferConfig( Config=boto3.s3.transfer.TransferConfig(
use_threads=False, use_threads=False,
num_download_attempts=container.config.retries, num_download_attempts=container.config.retries,
multipart_threshold=self._multipart_threshold, multipart_threshold=int(self._multipart_threshold),
multipart_chunksize=self._multipart_chunksize, multipart_chunksize=int(self._multipart_chunksize),
), ),
Callback=callback, Callback=callback,
ExtraArgs=extra_args ExtraArgs=extra_args
@ -545,12 +549,16 @@ class _Boto3Driver(_Driver):
'ContentType': get_file_mimetype(object_name or file_path) 'ContentType': get_file_mimetype(object_name or file_path)
} }
extra_args.update(container.config.extra_args or {}) extra_args.update(container.config.extra_args or {})
container.bucket.upload_file(file_path, object_name, Config=boto3.s3.transfer.TransferConfig( container.bucket.upload_file(
use_threads=container.config.multipart, file_path,
max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1, object_name,
num_download_attempts=container.config.retries, Config=boto3.s3.transfer.TransferConfig(
multipart_threshold=self._multipart_threshold, use_threads=container.config.multipart,
multipart_chunksize=self._multipart_chunksize), max_concurrency=int(self._max_multipart_concurrency) if container.config.multipart else 1,
num_download_attempts=container.config.retries,
multipart_threshold=int(self._multipart_threshold),
multipart_chunksize=int(self._multipart_chunksize),
),
Callback=callback, Callback=callback,
ExtraArgs=extra_args, ExtraArgs=extra_args,
) )
@ -564,8 +572,8 @@ class _Boto3Driver(_Driver):
Config=boto3.s3.transfer.TransferConfig( Config=boto3.s3.transfer.TransferConfig(
use_threads=False, use_threads=False,
num_download_attempts=container.config.retries, num_download_attempts=container.config.retries,
multipart_threshold=self._multipart_threshold, multipart_threshold=int(self._multipart_threshold),
multipart_chunksize=self._multipart_chunksize multipart_chunksize=int(self._multipart_chunksize)
), ),
Callback=callback, Callback=callback,
ExtraArgs=extra_args ExtraArgs=extra_args
@ -617,10 +625,11 @@ class _Boto3Driver(_Driver):
container = self._containers[obj.container_name] container = self._containers[obj.container_name]
config = boto3.s3.transfer.TransferConfig( config = boto3.s3.transfer.TransferConfig(
use_threads=container.config.multipart, use_threads=container.config.multipart,
max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1, max_concurrency=int(self._max_multipart_concurrency) if container.config.multipart else 1,
num_download_attempts=container.config.retries, num_download_attempts=container.config.retries,
multipart_threshold=self._multipart_threshold, multipart_threshold=int(self._multipart_threshold),
multipart_chunksize=self._multipart_chunksize) multipart_chunksize=int(self._multipart_chunksize),
)
total_size_mb = obj.content_length / (1024. * 1024.) total_size_mb = obj.content_length / (1024. * 1024.)
remote_path = os.path.join(obj.container_name, obj.key) remote_path = os.path.join(obj.container_name, obj.key)
cb = DownloadProgressReport(total_size_mb, verbose, remote_path, log) cb = DownloadProgressReport(total_size_mb, verbose, remote_path, log)
@ -637,10 +646,10 @@ class _Boto3Driver(_Driver):
container = self._containers[obj.container_name] container = self._containers[obj.container_name]
Config = boto3.s3.transfer.TransferConfig( Config = boto3.s3.transfer.TransferConfig(
use_threads=container.config.multipart, use_threads=container.config.multipart,
max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1, max_concurrency=int(self._max_multipart_concurrency) if container.config.multipart else 1,
num_download_attempts=container.config.retries, num_download_attempts=container.config.retries,
multipart_threshold=self._multipart_threshold, multipart_threshold=int(self._multipart_threshold),
multipart_chunksize=self._multipart_chunksize multipart_chunksize=int(self._multipart_chunksize)
) )
obj.download_file(str(p), Callback=callback, Config=Config) obj.download_file(str(p), Callback=callback, Config=Config)

View File

@ -971,7 +971,8 @@ class Task(_Task):
Use a list of strings for multiple optional project names. Use a list of strings for multiple optional project names.
:param str task_name: The full name or partial name of the Tasks to match within the specified :param str task_name: The full name or partial name of the Tasks to match within the specified
``project_name`` (or all projects if ``project_name`` is ``None``). ``project_name`` (or all projects if ``project_name`` is ``None``).
This method supports regular expressions for name matching. (Optional) This method supports regular expressions for name matching (if you wish to match special characters and
avoid any regex behaviour, use re.escape()). (Optional)
To match an exact task name (i.e. not partial matching), To match an exact task name (i.e. not partial matching),
add ^/$ at the beginning/end of the string, for example: "^exact_task_name_here$" add ^/$ at the beginning/end of the string, for example: "^exact_task_name_here$"
:param list tags: Filter based on the requested list of tags (strings) (Task must have all the listed tags) :param list tags: Filter based on the requested list of tags (strings) (Task must have all the listed tags)
@ -1020,7 +1021,8 @@ class Task(_Task):
Use a list of strings for multiple optional project names. Use a list of strings for multiple optional project names.
:param str task_name: The full name or partial name of the Tasks to match within the specified :param str task_name: The full name or partial name of the Tasks to match within the specified
``project_name`` (or all projects if ``project_name`` is ``None``). ``project_name`` (or all projects if ``project_name`` is ``None``).
This method supports regular expressions for name matching. (Optional) This method supports regular expressions for name matching (if you wish to match special characters and
avoid any regex behaviour, use re.escape()). (Optional)
:param str project_name: project name (str) the task belongs to (use None for all projects) :param str project_name: project name (str) the task belongs to (use None for all projects)
:param str task_name: task name (str) within the selected project :param str task_name: task name (str) within the selected project
Return any partial match of task_name, regular expressions matching is also supported. Return any partial match of task_name, regular expressions matching is also supported.
@ -2776,7 +2778,8 @@ class Task(_Task):
# leave this process. # leave this process.
if exit_process: if exit_process:
LoggerRoot.get_base_logger().warning('Terminating local execution process') LoggerRoot.get_base_logger().warning(
'ClearML Terminating local execution process - continuing execution remotely')
leave_process(0) leave_process(0)
return task return task

File diff suppressed because it is too large Load Diff

View File

@ -303,7 +303,8 @@ class WrapperBase(type):
'__repr__', '__reversed__', '__rfloorfiv__', '__rlshift__', '__rmod__', '__repr__', '__reversed__', '__rfloorfiv__', '__rlshift__', '__rmod__',
'__rmul__', '__ror__', '__rpow__', '__rrshift__', '__rshift__', '__rsub__', '__rmul__', '__ror__', '__rpow__', '__rrshift__', '__rshift__', '__rsub__',
'__rtruediv__', '__rxor__', '__setitem__', '__setslice__', '__sub__', '__rtruediv__', '__rxor__', '__setitem__', '__setslice__', '__sub__',
'__truediv__', '__xor__', 'next', '__str__', '__repr__', '__truediv__', '__xor__', 'next', '__str__', '__repr__',
'__round__', '__fspath__', '__bytes__', '__index__'
] ]
def __new__(mcs, classname, bases, attrs): def __new__(mcs, classname, bases, attrs):

View File

@ -1 +1 @@
__version__ = '1.12.0' __version__ = '1.12.2'