This commit is contained in:
revital
2025-01-22 15:20:44 +02:00
35 changed files with 1725 additions and 339 deletions

View File

@@ -55,7 +55,7 @@ a project may be further defined and clarified by project maintainers.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported by contacting the project team at <conduct@allegro.ai>. All
reported by contacting the project team at <conduct@clear.ml>. All
complaints will be reviewed and investigated and will result in a response that
is deemed necessary and appropriate to the circumstances. The project team is
obligated to maintain confidentiality with regard to the reporter of an incident.

View File

@@ -186,7 +186,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2024 ClearML
Copyright 2025 ClearML Inc
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

View File

@@ -7,8 +7,8 @@
**[ClearML](https://clear.ml) - Auto-Magical Suite of tools to streamline your AI workflow
</br>Experiment Manager, MLOps/LLMOps and Data-Management**
[![GitHub license](https://img.shields.io/github/license/allegroai/clearml.svg)](https://img.shields.io/github/license/allegroai/clearml.svg) [![PyPI pyversions](https://img.shields.io/pypi/pyversions/clearml.svg)](https://img.shields.io/pypi/pyversions/clearml.svg) [![PyPI version shields.io](https://img.shields.io/pypi/v/clearml.svg)](https://pypi.org/project/clearml/) [![Conda version shields.io](https://img.shields.io/conda/v/clearml/clearml)](https://anaconda.org/clearml/clearml) [![Optuna](https://img.shields.io/badge/Optuna-integrated-blue)](https://optuna.org)<br>
[![PyPI Downloads](https://static.pepy.tech/badge/clearml/month)](https://pypi.org/project/clearml/) [![Artifact Hub](https://img.shields.io/endpoint?url=https://artifacthub.io/badge/repository/allegroai)](https://artifacthub.io/packages/search?repo=allegroai) [![Youtube](https://img.shields.io/badge/ClearML-DD0000?logo=youtube&logoColor=white)](https://www.youtube.com/c/clearml) [![Slack Channel](https://img.shields.io/badge/slack-%23clearml--community-blueviolet?logo=slack)](https://joinslack.clear.ml) [![Signup](https://img.shields.io/badge/Clear%7CML-Signup-brightgreen)](https://app.clear.ml)
[![GitHub license](https://img.shields.io/github/license/clearml/clearml.svg)](https://img.shields.io/github/license/clearml/clearml.svg) [![PyPI pyversions](https://img.shields.io/pypi/pyversions/clearml.svg)](https://img.shields.io/pypi/pyversions/clearml.svg) [![PyPI version shields.io](https://img.shields.io/pypi/v/clearml.svg)](https://pypi.org/project/clearml/) [![Conda version shields.io](https://img.shields.io/conda/v/clearml/clearml)](https://anaconda.org/clearml/clearml) [![Optuna](https://img.shields.io/badge/Optuna-integrated-blue)](https://optuna.org)<br>
[![PyPI Downloads](https://static.pepy.tech/badge/clearml/month)](https://pypi.org/project/clearml/) [![Artifact Hub](https://img.shields.io/endpoint?url=https://artifacthub.io/badge/repository/clearml)](https://artifacthub.io/packages/search?repo=clearml) [![Youtube](https://img.shields.io/badge/ClearML-DD0000?logo=youtube&logoColor=white)](https://www.youtube.com/c/clearml) [![Slack Channel](https://img.shields.io/badge/slack-%23clearml--community-blueviolet?logo=slack)](https://joinslack.clear.ml) [![Signup](https://img.shields.io/badge/Clear%7CML-Signup-brightgreen)](https://app.clear.ml)
`🌟 ClearML is open-source - Leave a star to support the project! 🌟`
@@ -17,21 +17,20 @@
---
### ClearML
<sup>*Formerly known as Allegro Trains*<sup>
ClearML is a ML/DL development and production suite. It contains FIVE main modules:
- [Experiment Manager](#clearml-experiment-manager) - Automagical experiment tracking, environments and results
- [MLOps / LLMOps](https://github.com/allegroai/clearml-agent) - Orchestration, Automation & Pipelines solution for ML/DL/GenAI jobs (Kubernetes / Cloud / bare-metal)
- [Data-Management](https://github.com/allegroai/clearml/blob/master/docs/datasets.md) - Fully differentiable data management & version control solution on top of object-storage
- [MLOps / LLMOps](https://github.com/clearml/clearml-agent) - Orchestration, Automation & Pipelines solution for ML/DL/GenAI jobs (Kubernetes / Cloud / bare-metal)
- [Data-Management](https://github.com/clearml/clearml/blob/master/docs/datasets.md) - Fully differentiable data management & version control solution on top of object-storage
(S3 / GS / Azure / NAS)
- [Model-Serving](https://github.com/allegroai/clearml-serving) - *cloud-ready* Scalable model serving solution!
- [Model-Serving](https://github.com/clearml/clearml-serving) - *cloud-ready* Scalable model serving solution!
- **Deploy new model endpoints in under 5 minutes**
- Includes optimized GPU serving support backed by Nvidia-Triton
- **with out-of-the-box Model Monitoring**
- [Reports](https://clear.ml/docs/latest/docs/webapp/webapp_reports) - Create and share rich MarkDown documents supporting embeddable online content
- :fire: [Orchestration Dashboard](https://clear.ml/docs/latest/docs/webapp/webapp_orchestration_dash/) - Live rich dashboard for your entire compute cluster (Cloud / Kubernetes / On-Prem)
- **NEW** 💥 [Fractional GPUs](https://github.com/allegroai/clearml-fractional-gpu) - Container based, driver level GPU memory limitation 🙀 !!!
- **NEW** 💥 [Fractional GPUs](https://github.com/clearml/clearml-fractional-gpu) - Container based, driver level GPU memory limitation 🙀 !!!
Instrumenting these components is the **ClearML-server**, see [Self-Hosting](https://clear.ml/docs/latest/docs/deploying_clearml/clearml_server) & [Free tier Hosting](https://app.clear.ml)
@@ -48,20 +47,20 @@ Instrumenting these components is the **ClearML-server**, see [Self-Hosting](htt
<table>
<tbody>
<tr>
<td><a href="https://github.com/allegroai/clearml/blob/master/docs/tutorials/Getting_Started_1_Experiment_Management.ipynb"><b>Step 1</b></a> - Experiment Management</td>
<td><a target="_blank" href="https://colab.research.google.com/github/allegroai/clearml/blob/master/docs/tutorials/Getting_Started_1_Experiment_Management.ipynb">
<td><a href="https://github.com/clearml/clearml/blob/master/docs/tutorials/Getting_Started_1_Experiment_Management.ipynb"><b>Step 1</b></a> - Experiment Management</td>
<td><a target="_blank" href="https://colab.research.google.com/github/clearml/clearml/blob/master/docs/tutorials/Getting_Started_1_Experiment_Management.ipynb">
<img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a></td>
</tr>
<tr>
<td><a href="https://github.com/allegroai/clearml/blob/master/docs/tutorials/Getting_Started_2_Setting_Up_Agent.ipynb"><b>Step 2</b></a> - Remote Execution Agent Setup</td>
<td><a target="_blank" href="https://colab.research.google.com/github/allegroai/clearml/blob/master/docs/tutorials/Getting_Started_2_Setting_Up_Agent.ipynb">
<td><a href="https://github.com/clearml/clearml/blob/master/docs/tutorials/Getting_Started_2_Setting_Up_Agent.ipynb"><b>Step 2</b></a> - Remote Execution Agent Setup</td>
<td><a target="_blank" href="https://colab.research.google.com/github/clearml/clearml/blob/master/docs/tutorials/Getting_Started_2_Setting_Up_Agent.ipynb">
<img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a></td>
</tr>
<tr>
<td><a href="https://github.com/allegroai/clearml/blob/master/docs/tutorials/Getting_Started_3_Remote_Execution.ipynb"><b>Step 3</b></a> - Remotely Execute Tasks</td>
<td><a target="_blank" href="https://colab.research.google.com/github/allegroai/clearml/blob/master/docs/tutorials/Getting_Started_3_Remote_Execution.ipynb">
<td><a href="https://github.com/clearml/clearml/blob/master/docs/tutorials/Getting_Started_3_Remote_Execution.ipynb"><b>Step 3</b></a> - Remotely Execute Tasks</td>
<td><a target="_blank" href="https://colab.research.google.com/github/clearml/clearml/blob/master/docs/tutorials/Getting_Started_3_Remote_Execution.ipynb">
<img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a></td>
</tr>
@@ -79,8 +78,8 @@ Instrumenting these components is the **ClearML-server**, see [Self-Hosting](htt
<td>Datasets</td>
</tr>
<tr>
<td><a href="https://app.clear.ml"><img src="https://github.com/allegroai/clearml/blob/master/docs/experiment_manager.gif?raw=true" width="100%"></a></td>
<td><a href="https://app.clear.ml/datasets"><img src="https://github.com/allegroai/clearml/blob/master/docs/datasets.gif?raw=true" width="100%"></a></td>
<td><a href="https://app.clear.ml"><img src="https://github.com/clearml/clearml/blob/master/docs/experiment_manager.gif?raw=true" width="100%"></a></td>
<td><a href="https://app.clear.ml/datasets"><img src="https://github.com/clearml/clearml/blob/master/docs/datasets.gif?raw=true" width="100%"></a></td>
</tr>
<tr>
<td colspan="2" height="24px"></td>
@@ -90,8 +89,8 @@ Instrumenting these components is the **ClearML-server**, see [Self-Hosting](htt
<td>Pipelines</td>
</tr>
<tr>
<td><a href="https://app.clear.ml/workers-and-queues/autoscalers"><img src="https://github.com/allegroai/clearml/blob/master/docs/orchestration.gif?raw=true" width="100%"></a></td>
<td><a href="https://app.clear.ml/pipelines"><img src="https://github.com/allegroai/clearml/blob/master/docs/pipelines.gif?raw=true" width="100%"></a></td>
<td><a href="https://app.clear.ml/workers-and-queues/autoscalers"><img src="https://github.com/clearml/clearml/blob/master/docs/orchestration.gif?raw=true" width="100%"></a></td>
<td><a href="https://app.clear.ml/pipelines"><img src="https://github.com/clearml/clearml/blob/master/docs/pipelines.gif?raw=true" width="100%"></a></td>
</tr>
</tbody>
</table>
@@ -115,13 +114,13 @@ Instrumenting these components is the **ClearML-server**, see [Self-Hosting](htt
* Resource Monitoring (CPU/GPU utilization, temperature, IO, network, etc.)
* Model snapshots (With optional automatic upload to central storage: Shared folder, S3, GS, Azure, Http)
* Artifacts log & store (Shared folder, S3, GS, Azure, Http)
* Tensorboard/[TensorboardX](https://github.com/allegroai/clearml/tree/master/examples/frameworks/tensorboardx) scalars, metrics, histograms, **images, audio and video samples**
* [Matplotlib & Seaborn](https://github.com/allegroai/clearml/tree/master/examples/frameworks/matplotlib)
* Tensorboard/[TensorboardX](https://github.com/clearml/clearml/tree/master/examples/frameworks/tensorboardx) scalars, metrics, histograms, **images, audio and video samples**
* [Matplotlib & Seaborn](https://github.com/clearml/clearml/tree/master/examples/frameworks/matplotlib)
* [ClearML Logger](https://clear.ml/docs/latest/docs/fundamentals/logger) interface for complete flexibility.
* Extensive platform support and integrations
* Supported ML/DL frameworks: [PyTorch](https://github.com/allegroai/clearml/tree/master/examples/frameworks/pytorch) (incl' [ignite](https://github.com/allegroai/clearml/tree/master/examples/frameworks/ignite) / [lightning](https://github.com/allegroai/clearml/tree/master/examples/frameworks/pytorch-lightning)), [Tensorflow](https://github.com/allegroai/clearml/tree/master/examples/frameworks/tensorflow), [Keras](https://github.com/allegroai/clearml/tree/master/examples/frameworks/keras), [AutoKeras](https://github.com/allegroai/clearml/tree/master/examples/frameworks/autokeras), [FastAI](https://github.com/allegroai/clearml/tree/master/examples/frameworks/fastai), [XGBoost](https://github.com/allegroai/clearml/tree/master/examples/frameworks/xgboost), [LightGBM](https://github.com/allegroai/clearml/tree/master/examples/frameworks/lightgbm), [MegEngine](https://github.com/allegroai/clearml/tree/master/examples/frameworks/megengine) and [Scikit-Learn](https://github.com/allegroai/clearml/tree/master/examples/frameworks/scikit-learn)
* Supported ML/DL frameworks: [PyTorch](https://github.com/clearml/clearml/tree/master/examples/frameworks/pytorch) (incl' [ignite](https://github.com/clearml/clearml/tree/master/examples/frameworks/ignite) / [lightning](https://github.com/clearml/clearml/tree/master/examples/frameworks/pytorch-lightning)), [Tensorflow](https://github.com/clearml/clearml/tree/master/examples/frameworks/tensorflow), [Keras](https://github.com/clearml/clearml/tree/master/examples/frameworks/keras), [AutoKeras](https://github.com/clearml/clearml/tree/master/examples/frameworks/autokeras), [FastAI](https://github.com/clearml/clearml/tree/master/examples/frameworks/fastai), [XGBoost](https://github.com/clearml/clearml/tree/master/examples/frameworks/xgboost), [LightGBM](https://github.com/clearml/clearml/tree/master/examples/frameworks/lightgbm), [MegEngine](https://github.com/clearml/clearml/tree/master/examples/frameworks/megengine) and [Scikit-Learn](https://github.com/clearml/clearml/tree/master/examples/frameworks/scikit-learn)
* Seamless integration (including version control) with [**Jupyter Notebook**](https://jupyter.org/)
and [*PyCharm* remote debugging](https://github.com/allegroai/trains-pycharm-plugin)
and [*PyCharm* remote debugging](https://github.com/clearml/trains-pycharm-plugin)
#### [Start using ClearML](https://clear.ml/docs/latest/docs/getting_started/ds/ds_first_steps)
@@ -163,13 +162,13 @@ The ClearML run-time components:
* The ClearML Server - for storing experiment, model, and workflow data; supporting the Web UI experiment manager and MLOps automation for reproducibility and tuning. It is available as a hosted service and open source for you to deploy your own ClearML Server.
* The ClearML Agent - for MLOps orchestration, experiment and workflow reproducibility, and scalability.
<img src="https://raw.githubusercontent.com/allegroai/clearml-docs/main/docs/img/clearml_architecture.png" width="100%" alt="clearml-architecture">
<img src="https://raw.githubusercontent.com/clearml/clearml-docs/main/docs/img/clearml_architecture.png" width="100%" alt="clearml-architecture">
## Additional Modules
- [clearml-session](https://github.com/allegroai/clearml-session) - **Launch remote JupyterLab / VSCode-server inside any docker, on Cloud/On-Prem machines**
- [clearml-task](https://github.com/allegroai/clearml/blob/master/docs/clearml-task.md) - Run any codebase on remote machines with full remote logging of Tensorboard, Matplotlib & Console outputs
- [clearml-data](https://github.com/allegroai/clearml/blob/master/docs/datasets.md) - **CLI for managing and versioning your datasets, including creating / uploading / downloading of data from S3/GS/Azure/NAS**
- [clearml-session](https://github.com/clearml/clearml-session) - **Launch remote JupyterLab / VSCode-server inside any docker, on Cloud/On-Prem machines**
- [clearml-task](https://github.com/clearml/clearml/blob/master/docs/clearml-task.md) - Run any codebase on remote machines with full remote logging of Tensorboard, Matplotlib & Console outputs
- [clearml-data](https://github.com/clearml/clearml/blob/master/docs/datasets.md) - **CLI for managing and versioning your datasets, including creating / uploading / downloading of data from S3/GS/Azure/NAS**
- [AWS Auto-Scaler](https://clear.ml/docs/latest/docs/guides/services/aws_autoscaler) - Automatically spin EC2 instances based on your workloads with preconfigured budget! No need for AKE!
- [Hyper-Parameter Optimization](https://clear.ml/docs/latest/docs/guides/optimization/hyper-parameter-optimization/examples_hyperparam_opt) - Optimize any code with black-box approach and state-of-the-art Bayesian optimization algorithms
- [Automation Pipeline](https://clear.ml/docs/latest/docs/guides/pipeline/pipeline_controller) - Build pipelines based on existing experiments / jobs, supports building pipelines of pipelines!
@@ -212,7 +211,7 @@ If ClearML is part of your development process / project / publication, please c
@misc{clearml,
title = {ClearML - Your entire MLOps stack in one open-source tool},
year = {2024},
note = {Software available from http://github.com/allegroai/clearml},
note = {Software available from http://github.com/clearml/clearml},
url={https://clear.ml/},
author = {ClearML},
}
@@ -222,17 +221,17 @@ author = {ClearML},
For more information, see the [official documentation](https://clear.ml/docs) and [on YouTube](https://www.youtube.com/c/ClearML).
For examples and use cases, check the [examples folder](https://github.com/allegroai/clearml/tree/master/examples) and [corresponding documentation](https://clear.ml/docs/latest/docs/guides).
For examples and use cases, check the [examples folder](https://github.com/clearml/clearml/tree/master/examples) and [corresponding documentation](https://clear.ml/docs/latest/docs/guides).
If you have any questions: post on our [Slack Channel](https://joinslack.clear.ml), or tag your questions on [stackoverflow](https://stackoverflow.com/questions/tagged/clearml) with '**[clearml](https://stackoverflow.com/questions/tagged/clearml)**' tag (*previously [trains](https://stackoverflow.com/questions/tagged/trains) tag*).
For feature requests or bug reports, please use [GitHub issues](https://github.com/allegroai/clearml/issues).
For feature requests or bug reports, please use [GitHub issues](https://github.com/clearml/clearml/issues).
Additionally, you can always find us at *info@clear.ml*
## Contributing
**PRs are always welcome** :heart: See more details in the ClearML [Guidelines for Contributing](https://github.com/allegroai/clearml/blob/master/docs/contributing.md).
**PRs are always welcome** :heart: See more details in the ClearML [Guidelines for Contributing](https://github.com/clearml/clearml/blob/master/docs/contributing.md).
_May the force (and the goddess of learning rates) be with you!_

View File

@@ -15,7 +15,7 @@ from ..backend_api.session import defs
from ..backend_api.session.client import APIClient
from ..debugging import get_logger
# Worker's id in clearml would be composed from prefix, name, instance_type and cloud_id separated by ":"
# Worker's id in clearml would be composed of prefix, name, instance_type and cloud_id separated by ":"
# Example: 'test:m1:g4dn.4xlarge:i-07cf7d6750455cb62'
# cloud_id might be missing

View File

@@ -59,6 +59,7 @@ class PipelineController(object):
_update_execution_plot_interval = 5.*60
_update_progress_interval = 10.
_monitor_node_interval = 5.*60
_pipeline_as_sub_project_cached = None
_report_plot_execution_flow = dict(title='Pipeline', series='Execution Flow')
_report_plot_execution_details = dict(title='Pipeline Details', series='Execution Details')
_evaluated_return_values = {} # TID: pipeline_name
@@ -221,7 +222,8 @@ class PipelineController(object):
artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
output_uri=None, # type: Optional[Union[str, bool]]
skip_global_imports=False, # type: bool
working_dir=None # type: Optional[str]
working_dir=None, # type: Optional[str]
enable_local_imports=True # type: bool
):
# type: (...) -> None
"""
@@ -316,6 +318,11 @@ class PipelineController(object):
the steps from a functions, otherwise all global imports will be automatically imported in a safe manner at
the beginning of each steps execution. Default is False
:param working_dir: Working directory to launch the pipeline from.
:param enable_local_imports: If True, allow pipeline steps to import from local files
by appending to the PYTHONPATH of each step the directory the pipeline controller
script resides in (sys.path[0]).
If False, the directory won't be appended to PYTHONPATH. Default is True.
Ignored while running remotely.
"""
if auto_version_bump is not None:
warnings.warn("PipelineController.auto_version_bump is deprecated. It will be ignored", DeprecationWarning)
@@ -328,7 +335,7 @@ class PipelineController(object):
self._version = str(version).strip() if version else None
if self._version and not Version.is_valid_version_string(self._version):
raise ValueError(
"Setting non-semantic dataset version '{}'".format(self._version)
"Setting non-semantic pipeline version '{}'".format(self._version)
)
self._pool_frequency = pool_frequency * 60.
self._thread = None
@@ -348,19 +355,13 @@ class PipelineController(object):
self._reporting_lock = RLock()
self._pipeline_task_status_failed = None
self._mock_execution = False # used for nested pipelines (eager execution)
self._pipeline_as_sub_project = bool(Session.check_min_api_server_version("2.17"))
self._last_progress_update_time = 0
self._artifact_serialization_function = artifact_serialization_function
self._artifact_deserialization_function = artifact_deserialization_function
self._skip_global_imports = skip_global_imports
self._enable_local_imports = enable_local_imports
if not self._task:
task_name = name or project or '{}'.format(datetime.now())
if self._pipeline_as_sub_project:
parent_project = (project + "/" if project else "") + self._project_section
project_name = "{}/{}".format(parent_project, task_name)
else:
parent_project = None
project_name = project or 'Pipelines'
pipeline_project_args = self._create_pipeline_project_args(name, project)
# if user disabled the auto-repo, we force local script storage (repo="" or repo=False)
set_force_local_repo = False
@@ -369,8 +370,8 @@ class PipelineController(object):
set_force_local_repo = True
self._task = Task.init(
project_name=project_name,
task_name=task_name,
project_name=pipeline_project_args["project_name"],
task_name=pipeline_project_args["task_name"],
task_type=Task.TaskTypes.controller,
auto_resource_monitoring=False,
reuse_last_task_id=False
@@ -382,15 +383,13 @@ class PipelineController(object):
self._task._wait_for_repo_detection(timeout=300.)
Task.force_store_standalone_script(force=False)
# make sure project is hidden
if self._pipeline_as_sub_project:
get_or_create_project(
self._task.session, project_name=parent_project, system_tags=["hidden"])
get_or_create_project(
self._task.session, project_name=project_name,
project_id=self._task.project, system_tags=self._project_system_tags)
self._create_pipeline_projects(
task=self._task,
parent_project=pipeline_project_args["parent_project"],
project_name=pipeline_project_args["project_name"],
)
self._task.set_system_tags((self._task.get_system_tags() or []) + [self._tag])
if output_uri is not None:
self._task.output_uri = output_uri
self._output_uri = output_uri
@@ -401,7 +400,7 @@ class PipelineController(object):
self._task.set_script(repository=repo, branch=repo_branch, commit=repo_commit, working_dir=working_dir)
self._auto_connect_task = bool(self._task)
# make sure we add to the main Task the pipeline tag
if self._task and not self._pipeline_as_sub_project:
if self._task and not self._pipeline_as_sub_project():
self._task.add_tags([self._tag])
self._monitored_nodes = {} # type: Dict[str, dict]
@@ -411,9 +410,9 @@ class PipelineController(object):
else self._default_retry_on_failure_callback
# add direct link to the pipeline page
if self._pipeline_as_sub_project and self._task:
if self._pipeline_as_sub_project() and self._task:
if add_run_number and self._task.running_locally():
self._add_pipeline_name_run_number()
self._add_pipeline_name_run_number(self._task)
# noinspection PyProtectedMember
self._task.get_logger().report_text('ClearML pipeline page: {}'.format(
'{}/pipelines/{}/experiments/{}'.format(
@@ -423,6 +422,12 @@ class PipelineController(object):
))
)
@classmethod
def _pipeline_as_sub_project(cls):
if cls._pipeline_as_sub_project_cached is None:
cls._pipeline_as_sub_project_cached = bool(Session.check_min_api_server_version("2.17"))
return cls._pipeline_as_sub_project_cached
def set_default_execution_queue(self, default_execution_queue):
# type: (Optional[str]) -> None
"""
@@ -1224,7 +1229,7 @@ class PipelineController(object):
:param bool wait_on_upload: Whether the upload should be synchronous, forcing the upload to complete
before continuing.
:param Callable[Any, Union[bytes, bytearray]] serialization_function: A serialization function that takes one
:param serialization_function: A serialization function that takes one
parameter of any type which is the object to be serialized. The function should return
a `bytes` or `bytearray` object, which represents the serialized object. Note that the object will be
immediately serialized using this function, thus other serialization methods will not be used
@@ -1440,6 +1445,170 @@ class PipelineController(object):
"""
return self._pipeline_args
@classmethod
def _create_pipeline_project_args(cls, name, project):
task_name = name or project or '{}'.format(datetime.now())
if cls._pipeline_as_sub_project():
parent_project = (project + "/" if project else "") + cls._project_section
project_name = "{}/{}".format(parent_project, task_name)
else:
parent_project = None
project_name = project or 'Pipelines'
return {"task_name": task_name, "parent_project": parent_project, "project_name": project_name}
@classmethod
def _create_pipeline_projects(cls, task, parent_project, project_name):
# make sure project is hidden
if not cls._pipeline_as_sub_project():
return
get_or_create_project(Task._get_default_session(), project_name=parent_project, system_tags=["hidden"])
return get_or_create_project(
Task._get_default_session(),
project_name=project_name,
project_id=task.project,
system_tags=cls._project_system_tags,
)
@classmethod
def create(
cls,
project_name, # type: str
task_name, # type: str
repo=None, # type: str
branch=None, # type: Optional[str]
commit=None, # type: Optional[str]
script=None, # type: Optional[str]
working_directory=None, # type: Optional[str]
packages=None, # type: Optional[Union[bool, Sequence[str]]]
requirements_file=None, # type: Optional[Union[str, Path]]
docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str]
argparse_args=None, # type: Optional[Sequence[Tuple[str, str]]]
force_single_script_file=False, # type: bool
version=None, # type: Optional[str]
add_run_number=True, # type: bool
):
# type: (...) -> PipelineController
"""
Manually create and populate a new Pipeline in the system.
Supports pipelines from functions, decorators and tasks.
:param project_name: Set the project name for the pipeline.
:param task_name: Set the name of the remote pipeline..
:param repo: Remote URL for the repository to use, or path to local copy of the git repository.
Example: 'https://github.com/allegroai/clearml.git' or '~/project/repo'. If ``repo`` is specified, then
the ``script`` parameter must also be specified
:param branch: Select specific repository branch/tag (implies the latest commit from the branch)
:param commit: Select specific commit ID to use (default: latest commit,
or when used with local repository matching the local commit ID)
:param script: Specify the entry point script for the remote execution. When used in tandem with
remote git repository the script should be a relative path inside the repository,
for example: './source/train.py' . When used with local repository path it supports a
direct path to a file inside the local repository itself, for example: '~/project/source/train.py'
:param working_directory: Working directory to launch the script from. Default: repository root folder.
Relative to repo root or local folder.
:param packages: Manually specify a list of required packages. Example: ``["tqdm>=2.1", "scikit-learn"]``
or `True` to automatically create requirements
based on locally installed packages (repository must be local).
:param requirements_file: Specify requirements.txt file to install when setting the session.
If not provided, the requirements.txt from the repository will be used.
:param docker: Select the docker image to be executed in by the remote session
:param docker_args: Add docker arguments, pass a single string
:param docker_bash_setup_script: Add bash script to be executed
inside the docker before setting up the Task's environment
:param argparse_args: Arguments to pass to the remote execution, list of string pairs (argument, value)
Notice, only supported if the codebase itself uses argparse.ArgumentParser
:param force_single_script_file: If True, do not auto-detect local repository
:return: The newly created PipelineController
"""
pipeline_project_args = cls._create_pipeline_project_args(
name=task_name, project=project_name
)
pipeline_controller = Task.create(
project_name=pipeline_project_args["project_name"],
task_name=pipeline_project_args["task_name"],
task_type=Task.TaskTypes.controller,
repo=repo,
branch=branch,
commit=commit,
script=script,
working_directory=working_directory,
packages=packages,
requirements_file=requirements_file,
docker=docker,
docker_args=docker_args,
docker_bash_setup_script=docker_bash_setup_script,
argparse_args=argparse_args,
add_task_init_call=False,
force_single_script_file=force_single_script_file
)
cls._create_pipeline_projects(
task=pipeline_controller,
parent_project=pipeline_project_args["parent_project"],
project_name=pipeline_project_args["project_name"],
)
pipeline_controller.set_system_tags((pipeline_controller.get_system_tags() or []) + [cls._tag])
pipeline_controller.set_user_properties(version=version or cls._default_pipeline_version)
if add_run_number:
cls._add_pipeline_name_run_number(pipeline_controller)
print(pipeline_controller.get_output_log_web_page())
return cls._create_pipeline_controller_from_task(pipeline_controller)
@classmethod
def clone(
cls,
pipeline_controller, # type: Union[PipelineController, str]
name=None, # type: Optional[str]
comment=None, # type: Optional[str]
parent=None, # type: Optional[str]
project=None, # type: Optional[str]
version=None # type: Optional[str]
):
# type: (...) -> PipelineController
"""
Create a duplicate (a clone) of a pipeline (experiment). The status of the cloned pipeline is ``Draft``
and modifiable.
:param str pipeline_controller: The pipeline to clone. Specify a PipelineController object or an ID.
:param str name: The name of the new cloned pipeline.
:param str comment: A comment / description for the new cloned pipeline.
:param str parent: The ID of the parent Task of the new pipeline.
- If ``parent`` is not specified, then ``parent`` is set to ``source_task.parent``.
- If ``parent`` is not specified and ``source_task.parent`` is not available,
then ``parent`` set to ``source_task``.
:param str project: The project name in which to create the new pipeline.
If ``None``, the clone inherits the original pipeline's project
:param str version: The version of the new cloned pipeline. If ``None``, the clone
inherits the original pipeline's version
:return: The new cloned PipelineController
"""
if isinstance(pipeline_controller, six.string_types):
pipeline_controller = Task.get_task(task_id=pipeline_controller)
elif isinstance(pipeline_controller, PipelineController):
pipeline_controller = pipeline_controller.task
if project or name:
pipeline_project_args = cls._create_pipeline_project_args(
name=name or pipeline_controller.name, project=project or pipeline_controller.get_project_name()
)
project = cls._create_pipeline_projects(
task=pipeline_controller,
parent_project=pipeline_project_args["parent_project"],
project_name=pipeline_project_args["project_name"],
)
name = pipeline_project_args["task_name"]
cloned_controller = Task.clone(
source_task=pipeline_controller, name=name, comment=comment, parent=parent, project=project
)
if version:
cloned_controller.set_user_properties(version=version)
return cls._create_pipeline_controller_from_task(cloned_controller)
@classmethod
def enqueue(cls, pipeline_controller, queue_name=None, queue_id=None, force=False):
# type: (Union[PipelineController, str], Optional[str], Optional[str], bool) -> Any
@@ -1555,6 +1724,10 @@ class PipelineController(object):
error_msg += ", pipeline_version={}".format(pipeline_version)
raise ValueError(error_msg)
pipeline_task = Task.get_task(task_id=pipeline_id)
return cls._create_pipeline_controller_from_task(pipeline_task)
@classmethod
def _create_pipeline_controller_from_task(cls, pipeline_task):
pipeline_object = cls.__new__(cls)
pipeline_object._task = pipeline_task
pipeline_object._nodes = {}
@@ -1566,6 +1739,11 @@ class PipelineController(object):
pass
return pipeline_object
@property
def task(self):
# type: () -> Task
return self._task
@property
def id(self):
# type: () -> str
@@ -2473,7 +2651,7 @@ class PipelineController(object):
extra_args = dict()
extra_args["project"] = self._get_target_project(return_project_id=True) or None
# set Task name to match job name
if self._pipeline_as_sub_project:
if self._pipeline_as_sub_project():
extra_args["name"] = node.name
if node.explicit_docker_image:
extra_args["explicit_docker_image"] = node.explicit_docker_image
@@ -2507,6 +2685,7 @@ class PipelineController(object):
task_overrides=task_overrides,
allow_caching=node.cache_executed_step,
output_uri=node.output_uri,
enable_local_imports=self._enable_local_imports,
**extra_args
)
except Exception:
@@ -3194,23 +3373,24 @@ class PipelineController(object):
session=self._task.session if self._task else Task.default_session,
project_name=self._target_project)
def _add_pipeline_name_run_number(self):
@classmethod
def _add_pipeline_name_run_number(cls, task):
# type: () -> None
if not self._task:
if not task:
return
# if we were already executed, do not rename (meaning aborted pipeline that was continued)
# noinspection PyProtectedMember
if self._task._get_runtime_properties().get(self._runtime_property_hash):
if task._get_runtime_properties().get(cls._runtime_property_hash):
return
# remove the #<num> suffix if we have one:
task_name = re.compile(r" #\d+$").split(self._task.name or "", 1)[0]
task_name = re.compile(r" #\d+$").split(task.name or "", 1)[0]
page_size = 100
# find exact name or " #<num>" extension
prev_pipelines_ids = self._task.query_tasks(
prev_pipelines_ids = task.query_tasks(
task_name=r"^{}(| #\d+)$".format(task_name),
task_filter=dict(
project=[self._task.project], system_tags=[self._tag],
project=[task.project], system_tags=[cls._tag],
order_by=['-created'],
page_size=page_size,
fetch_only_first_page=True,
@@ -3223,8 +3403,8 @@ class PipelineController(object):
# worst case fail to auto increment
try:
# we assume we are the latest so let's take a few (last 10) and check the max number
last_task_name = self._task.query_tasks(
task_filter=dict(task_ids=prev_pipelines_ids[:10], project=[self._task.project]),
last_task_name = task.query_tasks(
task_filter=dict(task_ids=prev_pipelines_ids[:10], project=[task.project]),
additional_return_fields=['name'],
) # type: List[Dict]
# let's parse the names
@@ -3243,7 +3423,7 @@ class PipelineController(object):
max_value = 0
if max_value > 1:
self._task.set_name(task_name + " #{}".format(max_value))
task.set_name(task_name + " #{}".format(max_value))
@classmethod
def _get_pipeline_task(cls):
@@ -3497,7 +3677,8 @@ class PipelineDecorator(PipelineController):
artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
output_uri=None, # type: Optional[Union[str, bool]]
skip_global_imports=False, # type: bool
working_dir=None # type: Optional[str]
working_dir=None, # type: Optional[str]
enable_local_imports=True # type: bool
):
# type: (...) -> ()
"""
@@ -3585,6 +3766,11 @@ class PipelineDecorator(PipelineController):
global imports will be automatically imported in a safe manner at the beginning of each steps execution.
Default is False
:param working_dir: Working directory to launch the pipeline from.
:param enable_local_imports: If True, allow pipeline steps to import from local files
by appending to the PYTHONPATH of each step the directory the pipeline controller
script resides in (sys.path[0]).
If False, the directory won't be appended to PYTHONPATH. Default is True.
Ignored while running remotely.
"""
super(PipelineDecorator, self).__init__(
name=name,
@@ -3608,9 +3794,9 @@ class PipelineDecorator(PipelineController):
artifact_deserialization_function=artifact_deserialization_function,
output_uri=output_uri,
skip_global_imports=skip_global_imports,
working_dir=working_dir
working_dir=working_dir,
enable_local_imports=enable_local_imports
)
# if we are in eager execution, make sure parent class knows it
if self._eager_execution_instance:
self._mock_execution = True
@@ -3898,7 +4084,7 @@ class PipelineDecorator(PipelineController):
artifact_serialization_function=self._artifact_serialization_function,
artifact_deserialization_function=self._artifact_deserialization_function,
skip_global_imports=self._skip_global_imports,
working_dir=working_dir,
working_dir=working_dir
)
return task_definition
@@ -4429,7 +4615,8 @@ class PipelineDecorator(PipelineController):
artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
output_uri=None, # type: Optional[Union[str, bool]]
skip_global_imports=False, # type: bool
working_dir=None # type: Optional[str]
working_dir=None, # type: Optional[str]
enable_local_imports=True # type: bool
):
# type: (...) -> Callable
"""
@@ -4548,6 +4735,11 @@ class PipelineDecorator(PipelineController):
global imports will be automatically imported in a safe manner at the beginning of each steps execution.
Default is False
:param working_dir: Working directory to launch the pipeline from.
:param enable_local_imports: If True, allow pipeline steps to import from local files
by appending to the PYTHONPATH of each step the directory the pipeline controller
script resides in (sys.path[0]).
If False, the directory won't be appended to PYTHONPATH. Default is True.
Ignored while running remotely.
"""
def decorator_wrap(func):
@@ -4596,7 +4788,8 @@ class PipelineDecorator(PipelineController):
artifact_deserialization_function=artifact_deserialization_function,
output_uri=output_uri,
skip_global_imports=skip_global_imports,
working_dir=working_dir
working_dir=working_dir,
enable_local_imports=enable_local_imports
)
ret_val = func(**pipeline_kwargs)
LazyEvalWrapper.trigger_all_remote_references()
@@ -4650,7 +4843,8 @@ class PipelineDecorator(PipelineController):
artifact_deserialization_function=artifact_deserialization_function,
output_uri=output_uri,
skip_global_imports=skip_global_imports,
working_dir=working_dir
working_dir=working_dir,
enable_local_imports=enable_local_imports
)
a_pipeline._args_map = args_map or {}

View File

@@ -522,6 +522,7 @@ class ClearmlJob(BaseJob):
allow_caching=False, # type: bool
target_project=None, # type: Optional[str]
output_uri=None, # type: Optional[Union[str, bool]]
enable_local_imports=True, # type: bool
**kwargs # type: Any
):
# type: (...) -> ()
@@ -545,12 +546,17 @@ class ClearmlJob(BaseJob):
If True, use the base_task_id directly (base-task must be in draft-mode / created),
:param bool allow_caching: If True, check if we have a previously executed Task with the same specification.
If we do, use it and set internal is_cached flag. Default False (always create new Task).
:param Union[str, bool] output_uri: The storage / output url for this job. This is the default location for
:param output_uri: The storage / output url for this job. This is the default location for
output models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
:param str target_project: Optional, Set the target project name to create the cloned Task in.
:param enable_local_imports: If True, allow jobs to import from local files
by appending PYTHONPATH sys.path[0].
If False, the current path directory won't be appended to PYTHONPATH. Default is True.
Ignored while running remotely.
"""
super(ClearmlJob, self).__init__()
base_temp_task = Task.get_task(task_id=base_task_id)
self._enable_local_imports = enable_local_imports
if disable_clone_task:
self.task = base_temp_task
task_status = self.task.status
@@ -717,6 +723,14 @@ class LocalClearmlJob(ClearmlJob):
env['CLEARML_TASK_ID'] = env['TRAINS_TASK_ID'] = str(self.task.id)
env['CLEARML_LOG_TASK_TO_BACKEND'] = '1'
env['CLEARML_SIMULATE_REMOTE_TASK'] = '1'
try:
if self._enable_local_imports:
current_python_path = env.get("PYTHONPATH")
env["PYTHONPATH"] = (
"{}:{}".format(current_python_path, sys.path[0]) if current_python_path else sys.path[0]
)
except Exception as e:
logger.warning("Could not append local path to PYTHONPATH: {}".format(e))
self.task.mark_started()
self._job_process = subprocess.Popen(args=[python, local_filename], cwd=cwd, env=env)
return True

View File

@@ -1290,7 +1290,7 @@ class HyperParameterOptimizer(object):
:param str base_task_id: The Task ID to be used as template experiment to optimize.
:param list hyper_parameters: The list of Parameter objects to optimize over.
:param Union[str, Sequence[str]] objective_metric_title: The Objective metric title(s) to maximize / minimize
:param objective_metric_title: The Objective metric title(s) to maximize / minimize
(for example, ``validation``, ``["validation", "loss"]``). If ``objective_metric_title`` is a sequence
(used to optimize multiple objectives at the same time), then ``objective_metric_series`` and
``objective_metric_sign`` have to be sequences of the same length. Each title will be matched

View File

@@ -2,7 +2,8 @@ from .datamodel import DataModel
class ApiModel(DataModel):
""" API-related data model """
"""API-related data model"""
_service = None
_action = None
_version = None

View File

@@ -98,9 +98,7 @@ class StrictSession(Session):
"""
def init():
super(StrictSession, self).__init__(
initialize_logging=initialize_logging, *args, **kwargs
)
super(StrictSession, self).__init__(initialize_logging=initialize_logging, *args, **kwargs)
if not config_file:
init()
@@ -142,17 +140,17 @@ class Response(object):
self.response = None
self._result = result
response = getattr(result, "response", result)
if getattr(response, "_service") == "events" and \
getattr(response, "_action") in ("scalar_metrics_iter_histogram",
"multi_task_scalar_metrics_iter_histogram",
"vector_metrics_iter_histogram",
):
if getattr(response, "_service") == "events" and getattr(response, "_action") in (
"scalar_metrics_iter_histogram",
"multi_task_scalar_metrics_iter_histogram",
"vector_metrics_iter_histogram",
):
# put all the response data under metrics:
response.metrics = result.response_data
# noinspection PyProtectedMember
if 'metrics' not in response.__class__._get_data_props():
if "metrics" not in response.__class__._get_data_props():
# noinspection PyProtectedMember
response.__class__._data_props_list['metrics'] = 'metrics'
response.__class__._data_props_list["metrics"] = "metrics"
if dest:
response = getattr(response, dest)
self.response = response
@@ -170,11 +168,7 @@ class Response(object):
return repr(self.response)
def __dir__(self):
fields = [
name
for name in dir(self.response)
if isinstance(getattr(type(self.response), name, None), property)
]
fields = [name for name in dir(self.response) if isinstance(getattr(type(self.response), name, None), property)]
return list(set(chain(super(Response, self).__dir__(), fields)) - {"response"})
@@ -224,7 +218,7 @@ class TableResponse(Response):
return "" if result is None else result
fields = fields or self.fields
return '\n'.join(str(dict((attr, getter(item, attr)) for attr in fields)) for item in self)
return "\n".join(str(dict((attr, getter(item, attr)) for attr in fields)) for item in self)
def display(self, fields=None):
print(self._format_table(fields=fields))
@@ -251,10 +245,7 @@ class TableResponse(Response):
item
for item in self
if (not predicate or predicate(item))
and all(
compare_enum(getattr(item, key), value)
for key, value in kwargs.items()
)
and all(compare_enum(getattr(item, key), value) for key, value in kwargs.items())
],
)
@@ -393,9 +384,7 @@ def make_action(service, request_cls):
@wrap
def get(self, *args, **kwargs):
return entity(
self, self.session.send(request_cls(*args, **kwargs)).response
)
return entity(self, self.session.send(request_cls(*args, **kwargs)).response)
elif action == "create":
@@ -403,9 +392,7 @@ def make_action(service, request_cls):
def get(self, *args, **kwargs):
return entity(
self,
Namespace(
id=self.session.send(request_cls(*args, **kwargs)).response.id
),
Namespace(id=self.session.send(request_cls(*args, **kwargs)).response.id),
)
elif action in ["get_all", "get_all_ex"]:
@@ -458,7 +445,8 @@ def get_requests(service):
return OrderedDict(
(key, value)
for key, value in sorted(
vars(service.__wrapped__ if hasattr(service, '__wrapped__') else service).items(), key=itemgetter(0))
vars(service.__wrapped__ if hasattr(service, "__wrapped__") else service).items(), key=itemgetter(0)
)
if isinstance(value, type) and issubclass(value, APIRequest) and value._action
)
@@ -476,10 +464,7 @@ def make_service_class(module):
]
)
properties.update(
(f.__name__, f)
for f in (
make_action(module, value) for key, value in get_requests(module).items()
)
(f.__name__, f) for f in (make_action(module, value) for key, value in get_requests(module).items())
)
# noinspection PyTypeChecker
return type(str(module_name(module)), (Service,), properties)
@@ -504,14 +489,12 @@ class Version(Entity):
except AttributeError:
published = False
self.data = self._service.get_versions(
dataset=self.dataset, only_published=published, versions=[self.id]
)[0].data
self.data = self._service.get_versions(dataset=self.dataset, only_published=published, versions=[self.id])[
0
].data
def _get_default_kwargs(self):
return dict(
super(Version, self)._get_default_kwargs(), **{"dataset": self.data.dataset}
)
return dict(super(Version, self)._get_default_kwargs(), **{"dataset": self.data.dataset})
class APIClient(object):
@@ -549,18 +532,13 @@ class APIClient(object):
if mod
)
else:
services = OrderedDict(
(name, getattr(_api_services, name)) for name in _api_services.__all__
)
services = OrderedDict((name, getattr(_api_services, name)) for name in _api_services.__all__)
self._update_services(services)
def _update_services(self, services):
# type: (Dict[str, types.ModuleType]) -> ()
self.__dict__.update(
dict(
{
name: make_service_class(module)(self.session)
for name, module in services.items()
},
{name: make_service_class(module)(self.session) for name, module in services.items()},
)
)

View File

@@ -204,6 +204,7 @@ class Session(TokenManager):
"api.auth.request_token_expiration_sec",
self.config.get("api.auth.req_token_expiration_sec", None)
)
self.__auth_token = None
self._update_default_api_method()
if ENV_AUTH_TOKEN.get():
@@ -379,6 +380,11 @@ class Session(TokenManager):
case (only once). This is done since permissions are embedded in the token, and addresses a case where
server-side permissions have changed but are not reflected in the current token. Refreshing the token will
generate a token with the updated permissions.
NOTE: This method does not handle authorization. Credentials or token should be provided using the auth or
headers arguments, otherwise a successful authorization depends on the session containing a valid cookie
set during the last login call (which may not be there if the server's cookie domain does not match the URL
we use to access the server)
"""
if self._offline_mode:
return None
@@ -842,13 +848,27 @@ class Session(TokenManager):
)
)
auth = None
headers = None
# use token only once (the second time the token is already built into the http session)
if self.__auth_token:
headers = dict(Authorization="Bearer {}".format(self.__auth_token))
self.__auth_token = None
auth = HTTPBasicAuth(self.access_key, self.secret_key) if self.access_key and self.secret_key else None
token = None
if self.__auth_token: # try using initially provided token
token = self.__auth_token
elif self.access_key and self.secret_key: # otherwise, try using basic auth (if key/secret exists)
auth = HTTPBasicAuth(self.access_key, self.secret_key)
else: # otherwise, use the latest raw token
token = self.raw_token
if token:
headers = dict(Authorization="Bearer {}".format(token))
if not auth and not headers:
# No authorization info, something went wrong
self._logger.warning(
"refreshing token with no authorization info (no token or credentials, this might fail "
"if session does not have a valid cookie)"
)
res = None
try:
res = self._send_request(
@@ -878,6 +898,10 @@ class Session(TokenManager):
if ENV_AUTH_TOKEN.get():
ENV_AUTH_TOKEN.set(resp["data"]["token"])
# in any case, the initial token should only be used once (but only do it in case we actually managed
# to generate a new token in case this failed due to transient reasons)
self.__auth_token = None
return resp["data"]["token"]
except LoginError:
six.reraise(*sys.exc_info())

View File

@@ -11,12 +11,12 @@ if TYPE_CHECKING:
def get_items(cls):
""" get key/value items from an enum-like class (members represent enumeration key/value) """
return {k: v for k, v in vars(cls).items() if not k.startswith('_')}
"""get key/value items from an enum-like class (members represent enumeration key/value)"""
return {k: v for k, v in vars(cls).items() if not k.startswith("_")}
def get_options(cls):
""" get options from an enum-like class (members represent enumeration key/value) """
"""get options from an enum-like class (members represent enumeration key/value)"""
return get_items(cls).values()

View File

@@ -231,7 +231,7 @@ class CreateAndPopulate(object):
repo_info.script["diff"] = a_repo_info.script["diff"] or ""
repo_info.script["entry_point"] = a_repo_info.script["entry_point"]
if a_create_requirements:
repo_info["requirements"] = a_repo_info.script.get("requirements") or {}
repo_info.script["requirements"] = a_repo_info.script.get("requirements") or {}
# check if we have no repository and no requirements raise error
if (

View File

@@ -695,6 +695,28 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
ignore_errors=ignore_errors
)
def stop_request(self, ignore_errors=True, force=False, status_message=None):
# type: (bool, bool, Optional[str]) -> ()
"""
Request a task to stop. this will not change the task status
but mark a request for an agent or SDK to actually stop the Task.
This will trigger the Task's abort callback, and at the end will
change the task status to stopped and kill the Task's processes
Notice: calling this on your own Task, will cause
the watchdog to call the on_abort callback and kill the process
:param bool force: If not True, call fails if the task status is not 'in_progress'
:param bool ignore_errors: if False raise exception on error
:param str status_message: Optional, add status change message to the stop request.
This message will be stored as status_message on the Task's info panel
"""
# request task stop
return self.send(
tasks.StopRequest(self.id, force=force, status_reason="abort request", status_message=status_message),
ignore_errors=ignore_errors
)
def completed(self, ignore_errors=True):
# type: (bool) -> ()
"""

View File

@@ -257,6 +257,7 @@ class Artifacts(object):
max_preview_size_bytes = 65536
_flush_frequency_sec = 300.
_max_tmp_file_replace_attemps = 3
# notice these two should match
_save_format = '.csv.gz'
_compression = 'gzip'
@@ -1138,7 +1139,21 @@ class Artifacts(object):
temp_folder, prefix, suffix = self._temp_files_lookup.pop(local_filename)
fd, temp_filename = mkstemp(prefix=prefix, suffix=suffix)
os.close(fd)
os.replace(local_filename, temp_filename)
for i in range(self._max_tmp_file_replace_attemps):
try:
os.replace(local_filename, temp_filename)
break
except PermissionError:
LoggerRoot.get_base_logger().warning(
"Failed to replace {} with {}. Attemps left: {}".format(
local_filename, temp_filename, self._max_tmp_file_replace_attemps - i
)
)
else:
# final attempt, and if it fails, throw an exception
# exception could be thrown on some Windows systems
os.replace(local_filename, temp_filename)
local_filename = temp_filename
os.rmdir(temp_folder)
except Exception as ex:

View File

@@ -1,6 +1,6 @@
from .dataset import FileEntry, Dataset
__all__ = [
"FileEntry",
"Dataset",
"FileEntry",
"Dataset",
]

View File

@@ -552,7 +552,7 @@ class Dataset(object):
k: v
for k, v in self._dataset_link_entries.items()
if not matches_any_wildcard(k, dataset_path, recursive=recursive)
and not matches_any_wildcard(v.link, dataset_path, recursive=recursive)
and not (matches_any_wildcard(v.link, dataset_path, recursive=recursive) or v.link == dataset_path)
}
removed = 0
@@ -2263,14 +2263,14 @@ class Dataset(object):
def _get_dataset_files(
self,
force=False,
selected_chunks=None,
lock_target_folder=False,
cleanup_target_folder=True,
target_folder=None,
max_workers=None
force=False, # type: bool
selected_chunks=None, # type: Optional[List[int]]
lock_target_folder=False, # type: bool
cleanup_target_folder=True, # type: bool
target_folder=None, # type: Optional[Path]
max_workers=None, # type: Optional[int]
link_entries_of_interest=None # type: Optional[Dict[str, LinkEntry]]
):
# type: (bool, Optional[List[int]], bool, bool, Optional[Path], Optional[int]) -> str
"""
First, extracts the archive present on the ClearML server containing this dataset's files.
Then, download the remote files. Note that if a remote file was added to the ClearML server, then
@@ -2287,6 +2287,8 @@ class Dataset(object):
:param target_folder: If provided use the specified target folder, default, auto generate from Dataset ID.
:param max_workers: Number of threads to be spawned when getting dataset files. Defaults
to the number of virtual cores.
:param link_entries_of_interest: Download only the external files in this dictionary.
Useful when one doesn't want to download all the files in a parent dataset, as some files might be removed
:return: Path to the local storage where the data was downloaded
"""
@@ -2300,14 +2302,21 @@ class Dataset(object):
max_workers=max_workers
)
self._download_external_files(
target_folder=target_folder, lock_target_folder=lock_target_folder, max_workers=max_workers
target_folder=target_folder,
lock_target_folder=lock_target_folder,
max_workers=max_workers,
link_entries_of_interest=link_entries_of_interest,
)
return local_folder
def _download_external_files(
self, target_folder=None, lock_target_folder=False, max_workers=None
self,
target_folder=None,
lock_target_folder=False,
max_workers=None,
link_entries_of_interest=None
):
# (Union(Path, str), bool) -> None
# (Union(Path, str), bool, Optional[int], Optional[Dict[str, LinkEntry]]) -> None
"""
Downloads external files in the dataset. These files will be downloaded
at relative_path (the path relative to the target_folder). Note that
@@ -2318,6 +2327,8 @@ class Dataset(object):
:param lock_target_folder: If True, local the target folder so the next cleanup will not delete
Notice you should unlock it manually, or wait for the process to finish for auto unlocking.
:param max_workers: Number of threads to be spawned when getting dataset files. Defaults to no multi-threading.
:param link_entries_of_interest: Download only the external files in this dictionary.
Useful when one doesn't want to download all the files in a parent dataset, as some files might be removed
"""
def _download_link(link, target_path):
if os.path.exists(target_path):
@@ -2370,12 +2381,13 @@ class Dataset(object):
)[0]
).as_posix()
link_entries_of_interest = link_entries_of_interest or self._dataset_link_entries
if not max_workers:
for relative_path, link in self._dataset_link_entries.items():
for relative_path, link in link_entries_of_interest.items():
_submit_download_link(relative_path, link, target_folder)
else:
with ThreadPoolExecutor(max_workers=max_workers) as pool:
for relative_path, link in self._dataset_link_entries.items():
for relative_path, link in link_entries_of_interest.items():
_submit_download_link(relative_path, link, target_folder, pool=pool)
def _extract_dataset_archive(
@@ -3224,7 +3236,8 @@ class Dataset(object):
force=force,
lock_target_folder=True,
cleanup_target_folder=False,
max_workers=max_workers
max_workers=max_workers,
link_entries_of_interest=self._dataset_link_entries
))
ds_base_folder.touch()

View File

@@ -1,7 +1,22 @@
""" Debugging module """
from .timer import Timer
from .log import get_logger, get_null_logger, TqdmLog, add_options as add_log_options, \
apply_logging_args as parse_log_args, add_rotating_file_handler, add_time_rotating_file_handler
from .log import (
get_logger,
get_null_logger,
TqdmLog,
add_options as add_log_options,
apply_logging_args as parse_log_args,
add_rotating_file_handler,
add_time_rotating_file_handler,
)
__all__ = ["Timer", "get_logger", "get_null_logger", "TqdmLog", "add_log_options", "parse_log_args",
"add_rotating_file_handler", "add_time_rotating_file_handler"]
__all__ = [
"Timer",
"get_logger",
"get_null_logger",
"TqdmLog",
"add_log_options",
"parse_log_args",
"add_rotating_file_handler",
"add_time_rotating_file_handler",
]

View File

@@ -21,7 +21,7 @@ def _thread_linux_id():
def _thread_py_id():
# return threading.get_ident()
return zipfile.crc32(int(threading.get_ident()).to_bytes(8, 'little'))
return zipfile.crc32(int(threading.get_ident()).to_bytes(8, "little"))
def _log_stderr(name, fnc, args, kwargs, is_return):
@@ -32,20 +32,22 @@ def _log_stderr(name, fnc, args, kwargs, is_return):
return
if __trace_level not in (1, 2, -1, -2):
return
fnc_address = str(fnc).split(' at ')
fnc_address = '{}'.format(fnc_address[-1].replace('>', '')) if len(fnc_address) > 1 else ''
fnc_address = str(fnc).split(" at ")
fnc_address = "{}".format(fnc_address[-1].replace(">", "")) if len(fnc_address) > 1 else ""
if __trace_level == 1 or __trace_level == -1:
t = '{:14} {}'.format(fnc_address, name)
t = "{:14} {}".format(fnc_address, name)
elif __trace_level == 2 or __trace_level == -2:
a_args = str(args)[1:-1] if args else ''
a_kwargs = ' {}'.format(kwargs) if kwargs else ''
t = '{:14} {} ({}{})'.format(fnc_address, name, a_args, a_kwargs)
a_args = str(args)[1:-1] if args else ""
a_kwargs = " {}".format(kwargs) if kwargs else ""
t = "{:14} {} ({}{})".format(fnc_address, name, a_args, a_kwargs)
# get a nicer thread id
h = int(__thread_id())
ts = time.time() - __trace_start
__stream_write('{}{:<9.3f}:{:5}:{:8x}: [{}] {}\n'.format(
'-' if is_return else '', ts, os.getpid(),
h, threading.current_thread().name, t))
__stream_write(
"{}{:<9.3f}:{:5}:{:8x}: [{}] {}\n".format(
"-" if is_return else "", ts, os.getpid(), h, threading.current_thread().name, t
)
)
if __stream_flush:
__stream_flush()
except Exception:
@@ -64,6 +66,7 @@ def _traced_call_method(name, fnc):
if r:
raise r
return ret
return _traced_call_int
@@ -82,7 +85,7 @@ def _traced_call_cls(name, fnc):
raise r
return ret
return WrapperClass.__dict__['_traced_call_int']
return WrapperClass.__dict__["_traced_call_int"]
def _traced_call_static(name, fnc):
@@ -99,7 +102,8 @@ def _traced_call_static(name, fnc):
if r:
raise r
return ret
return WrapperStatic.__dict__['_traced_call_int']
return WrapperStatic.__dict__["_traced_call_int"]
def _traced_call_func(name, fnc):
@@ -114,14 +118,16 @@ def _traced_call_func(name, fnc):
if r:
raise r
return ret
return _traced_call_int
def _patch_module(module, prefix='', basepath=None, basemodule=None, exclude_prefixes=[], only_prefix=[]):
def _patch_module(module, prefix="", basepath=None, basemodule=None, exclude_prefixes=[], only_prefix=[]):
if isinstance(module, str):
if basemodule is None:
basemodule = module + '.'
basemodule = module + "."
import importlib
importlib.import_module(module)
module = sys.modules.get(module)
if not module:
@@ -130,8 +136,8 @@ def _patch_module(module, prefix='', basepath=None, basemodule=None, exclude_pre
basepath = os.path.sep.join(module.__file__.split(os.path.sep)[:-1]) + os.path.sep
# only sub modules
if not hasattr(module, '__file__') or (inspect.ismodule(module) and not module.__file__.startswith(basepath)):
if hasattr(module, '__module__') and module.__module__.startswith(basemodule):
if not hasattr(module, "__file__") or (inspect.ismodule(module) and not module.__file__.startswith(basepath)):
if hasattr(module, "__module__") and module.__module__.startswith(basemodule):
# this is one of ours
pass
else:
@@ -139,25 +145,25 @@ def _patch_module(module, prefix='', basepath=None, basemodule=None, exclude_pre
return
# Do not patch ourselves
if hasattr(module, '__file__') and module.__file__ == __file__:
if hasattr(module, "__file__") and module.__file__ == __file__:
return
prefix += module.__name__.split('.')[-1] + '.'
prefix += module.__name__.split(".")[-1] + "."
# Do not patch low level network layer
if prefix.startswith('clearml.backend_api.session.') and prefix != 'clearml.backend_api.session.':
if not prefix.endswith('.Session.') and '.token_manager.' not in prefix:
if prefix.startswith("clearml.backend_api.session.") and prefix != "clearml.backend_api.session.":
if not prefix.endswith(".Session.") and ".token_manager." not in prefix:
# print('SKIPPING: {}'.format(prefix))
return
if prefix.startswith('clearml.backend_api.services.'):
if prefix.startswith("clearml.backend_api.services."):
return
for skip in exclude_prefixes:
if prefix.startswith(skip):
return
for fn in (m for m in dir(module) if not m.startswith('__')):
if fn in ('schema_property') or fn.startswith('_PostImportHookPatching__'):
for fn in (m for m in dir(module) if not m.startswith("__")):
if fn in ("schema_property") or fn.startswith("_PostImportHookPatching__"):
continue
# noinspection PyBroadException
try:
@@ -165,27 +171,39 @@ def _patch_module(module, prefix='', basepath=None, basemodule=None, exclude_pre
except Exception:
continue
if inspect.ismodule(fnc):
_patch_module(fnc, prefix=prefix, basepath=basepath, basemodule=basemodule,
exclude_prefixes=exclude_prefixes, only_prefix=only_prefix)
_patch_module(
fnc,
prefix=prefix,
basepath=basepath,
basemodule=basemodule,
exclude_prefixes=exclude_prefixes,
only_prefix=only_prefix,
)
elif inspect.isclass(fnc):
_patch_module(fnc, prefix=prefix, basepath=basepath, basemodule=basemodule,
exclude_prefixes=exclude_prefixes, only_prefix=only_prefix)
_patch_module(
fnc,
prefix=prefix,
basepath=basepath,
basemodule=basemodule,
exclude_prefixes=exclude_prefixes,
only_prefix=only_prefix,
)
elif inspect.isroutine(fnc):
if only_prefix and all(p not in (prefix+str(fn)) for p in only_prefix):
if only_prefix and all(p not in (prefix + str(fn)) for p in only_prefix):
continue
for skip in exclude_prefixes:
if (prefix+str(fn)).startswith(skip):
if (prefix + str(fn)).startswith(skip):
continue
# _log_stderr('Patching: {}'.format(prefix+fn))
if inspect.isclass(module):
# check if this is even in our module
if hasattr(fnc, '__module__') and fnc.__module__ != module.__module__:
if hasattr(fnc, "__module__") and fnc.__module__ != module.__module__:
pass # print('not ours {} {}'.format(module, fnc))
elif hasattr(fnc, '__qualname__') and fnc.__qualname__.startswith(module.__name__ + '.'):
elif hasattr(fnc, "__qualname__") and fnc.__qualname__.startswith(module.__name__ + "."):
if isinstance(module.__dict__[fn], classmethod):
setattr(module, fn, _traced_call_cls(prefix + fn, fnc))
elif isinstance(module.__dict__[fn], staticmethod):
@@ -194,7 +212,7 @@ def _patch_module(module, prefix='', basepath=None, basemodule=None, exclude_pre
setattr(module, fn, _traced_call_method(prefix + fn, fnc))
else:
# probably not ours hopefully static function
if hasattr(fnc, '__qualname__') and not fnc.__qualname__.startswith(module.__name__ + '.'):
if hasattr(fnc, "__qualname__") and not fnc.__qualname__.startswith(module.__name__ + "."):
pass # print('not ours {} {}'.format(module, fnc))
else:
# we should not get here
@@ -226,17 +244,18 @@ def trace_trains(stream=None, level=1, exclude_prefixes=[], only_prefix=[]):
return
__patched_trace = True
if not __thread_id:
if sys.platform == 'linux':
if sys.platform == "linux":
import ctypes
__thread_so = ctypes.cdll.LoadLibrary('libc.so.6')
__thread_so = ctypes.cdll.LoadLibrary("libc.so.6")
__thread_id = _thread_linux_id
else:
__thread_id = _thread_py_id
stderr_write = sys.stderr._original_write if hasattr(sys.stderr, '_original_write') else sys.stderr.write
stderr_write = sys.stderr._original_write if hasattr(sys.stderr, "_original_write") else sys.stderr.write
if stream:
if isinstance(stream, str):
stream = open(stream, 'w')
stream = open(stream, "w")
__stream_write = stream.write
__stream_flush = stream.flush
else:
@@ -244,15 +263,16 @@ def trace_trains(stream=None, level=1, exclude_prefixes=[], only_prefix=[]):
__stream_flush = None
from ..version import __version__
msg = 'ClearML v{} - Starting Trace\n\n'.format(__version__)
msg = "ClearML v{} - Starting Trace\n\n".format(__version__)
# print to actual stderr
stderr_write(msg)
# store to stream
__stream_write(msg)
__stream_write('{:9}:{:5}:{:8}: {:14}\n'.format('seconds', 'pid', 'tid', 'self'))
__stream_write('{:9}:{:5}:{:8}:{:15}\n'.format('-' * 9, '-' * 5, '-' * 8, '-' * 15))
__stream_write("{:9}:{:5}:{:8}: {:14}\n".format("seconds", "pid", "tid", "self"))
__stream_write("{:9}:{:5}:{:8}:{:15}\n".format("-" * 9, "-" * 5, "-" * 8, "-" * 15))
__trace_start = time.time()
_patch_module('clearml', exclude_prefixes=exclude_prefixes or [], only_prefix=only_prefix or [])
_patch_module("clearml", exclude_prefixes=exclude_prefixes or [], only_prefix=only_prefix or [])
def trace_level(level=1):
@@ -286,25 +306,25 @@ def print_traced_files(glob_mask, lines_per_tid=5, stream=sys.stdout, specify_pi
from glob import glob
def hash_line(a_line):
return hash(':'.join(a_line.split(':')[1:]))
return hash(":".join(a_line.split(":")[1:]))
pids = {}
orphan_calls = set()
print_orphans = False
for fname in glob(glob_mask, recursive=False):
with open(fname, 'rt') as fd:
with open(fname, "rt") as fd:
lines = fd.readlines()
for line in lines:
# noinspection PyBroadException
try:
_, pid, tid = line.split(':')[:3]
_, pid, tid = line.split(":")[:3]
pid = int(pid)
except Exception:
continue
if specify_pids and pid not in specify_pids:
continue
if line.startswith('-'):
if line.startswith("-"):
print_orphans = True
line = line[1:]
h = hash_line(line)
@@ -323,16 +343,16 @@ def print_traced_files(glob_mask, lines_per_tid=5, stream=sys.stdout, specify_pi
by_time = {}
for p, tids in pids.items():
for t, lines in tids.items():
ts = float(lines[-1].split(':')[0].strip()) + 0.000001 * len(by_time)
ts = float(lines[-1].split(":")[0].strip()) + 0.000001 * len(by_time)
if print_orphans:
for i, line in enumerate(lines):
if i > 0 and hash_line(line) in orphan_calls:
lines[i] = ' ### Orphan ### {}'.format(line)
by_time[ts] = ''.join(lines) + '\n'
lines[i] = " ### Orphan ### {}".format(line)
by_time[ts] = "".join(lines) + "\n"
out_stream = open(stream, 'w') if isinstance(stream, str) else stream
out_stream = open(stream, "w") if isinstance(stream, str) else stream
for k in sorted(by_time.keys()):
out_stream.write(by_time[k] + '\n')
out_stream.write(by_time[k] + "\n")
if isinstance(stream, str):
out_stream.close()
@@ -345,11 +365,11 @@ def end_of_program():
def stdout_print(*args, **kwargs):
if len(args) == 1 and not kwargs:
line = str(args[0])
if not line.endswith('\n'):
line += '\n'
if not line.endswith("\n"):
line += "\n"
else:
line = '{} {}\n'.format(args or '', kwargs or '')
if hasattr(sys.stdout, '_original_write'):
line = "{} {}\n".format(args or "", kwargs or "")
if hasattr(sys.stdout, "_original_write"):
sys.stdout._original_write(line)
else:
sys.stdout.write(line)
@@ -361,16 +381,18 @@ def debug_print(*args, **kwargs):
Example: [pid=123, t=0.003] message here
"""
global tic
tic = globals().get('tic', time.time())
tic = globals().get("tic", time.time())
stdout_print(
"\033[1;33m[pid={}, t={:.04f}] ".format(os.getpid(), time.time()-tic)
+ str(args[0] if len(args) == 1 else ("" if not args else args)) + "\033[0m", **kwargs
)
"\033[1;33m[pid={}, t={:.04f}] ".format(os.getpid(), time.time() - tic)
+ str(args[0] if len(args) == 1 else ("" if not args else args))
+ "\033[0m",
**kwargs
)
tic = time.time()
if __name__ == '__main__':
if __name__ == "__main__":
# from clearml import Task
# task = Task.init(project_name="examples", task_name="trace test")
# trace_trains('_trace.txt', level=2)
print_traced_files('_trace_*.txt', lines_per_tid=10)
print_traced_files("_trace_*.txt", lines_per_tid=10)

View File

@@ -230,7 +230,7 @@ class Logger(object):
:param xaxis: The x-axis title. (Optional)
:param yaxis: The y-axis title. (Optional)
:param mode: Multiple histograms mode, stack / group / relative. Default is 'group'.
:param extra_layout: optional dictionary for layout configuration, passed directly to plotly
:param extra_layout: Optional dictionary for layout configuration, passed directly to plotly.
See full details on the supported configuration: https://plotly.com/javascript/reference/layout/
example: ``extra_layout={'showlegend': False, 'plot_bgcolor': 'yellow'}``
"""
@@ -266,6 +266,7 @@ class Logger(object):
data_args=None, # type: Optional[dict]
extra_layout=None, # type: Optional[dict]
):
# type: (...) -> ()
"""
For explicit reporting, plot a (default grouped) histogram.
Notice this function will not calculate the histogram,
@@ -590,12 +591,13 @@ class Logger(object):
comment=None, # type: Optional[str]
extra_layout=None, # type: Optional[dict]
):
# type: (...) -> ()
"""
For explicit reporting, plot a 3d scatter graph (with markers).
:param str title: The title (metric) of the plot.
:param str series: The series name (variant) of the reported scatter plot.
:param Union[numpy.ndarray, list] scatter: The scatter data.
:param scatter: The scatter data.
list of (pairs of x,y,z), list of series [[(x1,y1,z1)...]], or numpy.ndarray
:param int iteration: The reported iteration / step.
:param str xaxis: The x-axis title. (Optional)

View File

@@ -818,12 +818,13 @@ class BaseModel(object):
comment=None, # type: Optional[str]
extra_layout=None # type: Optional[dict]
):
# type: (...) -> ()
"""
For explicit reporting, plot a 3d scatter graph (with markers).
:param str title: The title (metric) of the plot.
:param str series: The series name (variant) of the reported scatter plot.
:param Union[numpy.ndarray, list] scatter: The scatter data.
:param scatter: The scatter data.
list of (pairs of x,y,z), list of series [[(x1,y1,z1)...]], or numpy.ndarray
:param int iteration: The reported iteration / step.
:param str xaxis: The x-axis title. (Optional)

View File

View File

@@ -0,0 +1,236 @@
import copy
import time
import uuid
from threading import Thread
from ..task import Task
from ..utilities.resource_monitor import ResourceMonitor
class EndpointTelemetry:
BACKEND_STAT_MAP = {
"cpu_usage_*": "cpu_usage",
"cpu_temperature_*": "cpu_temperature",
"disk_free_percent": "disk_free_home",
"io_read_mbs": "disk_read",
"io_write_mbs": "disk_write",
"network_tx_mbs": "network_tx",
"network_rx_mbs": "network_rx",
"memory_free_gb": "memory_free",
"memory_used_gb": "memory_used",
"gpu_temperature_*": "gpu_temperature",
"gpu_mem_used_gb_*": "gpu_memory_used",
"gpu_mem_free_gb_*": "gpu_memory_free",
"gpu_utilization_*": "gpu_usage",
}
def __init__(
self,
endpoint_name="endpoint",
model_name="model",
model=None,
model_url=None,
model_source=None,
model_version=None,
app_id=None,
app_instance=None,
tags=None,
system_tags=None,
container_id=None,
input_size=None,
input_type="str",
report_statistics=True,
endpoint_url=None,
preprocess_artifact=None,
force_register=False,
):
self.report_window = 30
self._previous_readouts = {}
self._previous_readouts_ts = time.time()
self._num_readouts = 0
self.container_info = {
"container_id": container_id or str(uuid.uuid4()).replace("-", ""),
"endpoint_name": endpoint_name,
"model_name": model_name,
"model_source": model_source,
"model_version": model_version,
"preprocess_artifact": preprocess_artifact,
"input_type": str(input_type),
"input_size": str(input_size),
"tags": tags,
"system_tags": system_tags,
"endpoint_url": endpoint_url,
}
references = []
if app_id:
references.append({"type": "app_id", "value": app_id})
if app_instance:
references.append({"type": "app_instance", "value": app_instance})
references.append({"type": "task", "value": Task.current_task().id})
if model:
references.append({"type": "model", "value": model})
if model_url:
references.append({"type": "url", "value": model_url})
self.container_info["reference"] = references
self.session = Task._get_default_session()
self.requests_num = 0
self.requests_num_window = 0
self.requests_num_prev_window = 0
self.latency_sum_window = 0
self.uptime_timestamp = time.time()
self.last_request_time = None
# use readily available resource monitor, otherwise create one (can happen in spawned subprocesses)
self.resource_monitor = Task.current_task()._resource_monitor or ResourceMonitor(Task.current_task())
if not container_id and not force_register:
self.register_container()
self._stop_container_status_report_daemon = False
if report_statistics:
Thread(target=self.container_status_report_daemon, daemon=True).start()
def stop(self):
self._stop_container_status_report_daemon = True
def update(
self,
endpoint_name=None,
model_name=None,
model=None,
model_url=None,
model_source=None,
model_version=None,
tags=None,
system_tags=None,
input_size=None,
input_type=None,
endpoint_url=None,
preprocess_artifact=None,
):
update_dict = {}
if endpoint_name is not None:
update_dict["endpoint_name"] = endpoint_name
if model_name is not None:
update_dict["model_name"] = model_name
if model_source is not None:
update_dict["model_source"] = model_source
if model_version is not None:
update_dict["model_version"] = model_version
if preprocess_artifact is not None:
update_dict["preprocess_artifact"] = preprocess_artifact
if input_type is not None:
update_dict["input_type"] = input_type
if input_size is not None:
update_dict["input_size"] = input_size
if tags is not None:
update_dict["tags"] = tags
if system_tags is not None:
update_dict["system_tags"] = system_tags
if endpoint_url is not None:
update_dict["endpoint_url"] = endpoint_url
self.container_info.update(update_dict)
references_to_add = {}
if model:
references_to_add["model"] = {"type": "model", "value": model}
if model_url:
references_to_add["model_url"] = {"type": "url", "value": model_url}
for reference in self.container_info["reference"]:
if reference["type"] in references_to_add:
reference["value"] = references_to_add[reference["type"]]["value"]
references_to_add.pop(reference["type"], None)
self.container_info["reference"].extend(list(references_to_add.values()))
def register_container(self):
result = self.session.send_request("serving", "register_container", json=self.container_info)
if result.status_code != 200:
print("Failed registering container: {}".format(result.json()))
def wait_for_endpoint_url(self):
while not self.container_info.get("endpoint_url"):
Task.current_task().reload()
endpoint = Task.current_task()._get_runtime_properties().get("endpoint")
if endpoint:
self.container_info["endpoint_url"] = endpoint
self.uptime_timestamp = time.time()
else:
time.sleep(1)
def get_machine_stats(self):
def create_general_key(old_key):
return "{}_*".format(old_key)
stats = self.resource_monitor._machine_stats()
elapsed = time.time() - self._previous_readouts_ts
self._previous_readouts_ts = time.time()
updates = {}
for k, v in stats.items():
if k.endswith("_mbs"):
v = (v - self._previous_readouts.get(k, v)) / elapsed
updates[k] = v
self._previous_readouts = copy.deepcopy(stats)
stats.update(updates)
self._num_readouts += 1
preprocessed_stats = {}
ordered_keys = sorted(stats.keys())
for k in ordered_keys:
v = stats[k]
if k in ["memory_used_gb", "memory_free_gb"]:
v *= 1024
if isinstance(v, float):
v = round(v, 3)
stat_key = self.BACKEND_STAT_MAP.get(k)
if stat_key:
preprocessed_stats[stat_key] = v
else:
general_key = create_general_key(k)
if general_key.startswith("gpu"):
prev_general_key = general_key
general_key = "_".join(["gpu"] + general_key.split("_")[2:])
if general_key == "gpu_mem_used_gb_*":
gpu_index = prev_general_key.split("_")[1]
mem_usage = min(stats["gpu_{}_mem_usage".format(gpu_index)], 99.99)
mem_free = stats["gpu_{}_mem_free_gb".format(gpu_index)]
v = (mem_usage * mem_free) / (100 - mem_usage)
if general_key in ["gpu_mem_used_gb_*", "gpu_mem_free_gb_*"]:
v *= 1024
general_key = self.BACKEND_STAT_MAP.get(general_key)
if general_key:
preprocessed_stats.setdefault(general_key, []).append(v)
return preprocessed_stats
def container_status_report_daemon(self):
while not self._stop_container_status_report_daemon:
self.container_status_report()
time.sleep(self.report_window)
def container_status_report(self):
self.wait_for_endpoint_url()
status_report = {**self.container_info}
status_report["uptime_sec"] = int(time.time() - self.uptime_timestamp)
status_report["requests_num"] = self.requests_num
status_report["requests_min"] = self.requests_num_window + self.requests_num_prev_window
status_report["latency_ms"] = (
0 if (self.requests_num_window == 0) else (self.latency_sum_window / self.requests_num_window)
)
status_report["machine_stats"] = self.get_machine_stats()
self.requests_num_prev_window = self.requests_num_window
self.requests_num_window = 0
self.latency_sum_window = 0
self.latency_num_window = 0
result = self.session.send_request("serving", "container_status_report", json=status_report)
if result.status_code != 200:
print("Failed sending status report: {}".format(result.json()))
def update_last_request_time(self):
self.last_request_time = time.time()
def update_statistics(self):
self.requests_num += 1
self.requests_num_window += 1
latency = (time.time() - self.last_request_time) * 1000
self.latency_sum_window += latency
def on_request(self):
self.update_last_request_time()
def on_response(self):
self.update_statistics()

View File

@@ -0,0 +1,245 @@
import functools
import threading
from multiprocessing import Process
from typing import Optional
import httpx
import uvicorn
from fastapi import FastAPI, Request, Response
from fastapi.responses import StreamingResponse
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.routing import Match
from .route import Route
from ..utilities.process.mp import SafeQueue
class FastAPIProxy:
ALL_REST_METHODS = ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"]
def __init__(self, port, workers=None, default_target=None, log_level=None, access_log=None, enable_streaming=True):
self.app = None
self.routes = {}
self.port = port
self.message_queue = SafeQueue()
self.uvicorn_subprocess = None
self.workers = workers
self.access_log = access_log
self.log_level = None
self.enable_streaming = enable_streaming
self._default_target = default_target
self._default_session = None
self._in_subprocess = False
def _create_default_route(self):
proxy = self
class DefaultRouteMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
scope = {
"type": "http",
"method": request.method,
"path": request.url.path,
"root_path": "",
"headers": request.headers.raw,
"query_string": request.url.query.encode("utf-8"),
"client": request.client,
"server": request.scope.get("server"),
"scheme": request.url.scheme,
"extensions": request.scope.get("extensions", {}),
"app": request.scope.get("app"),
}
for route in proxy.app.router.routes:
if route.matches(scope)[0] == Match.FULL:
return await call_next(request)
proxied_response = await proxy._send_request(
request, proxy._default_target, proxy._default_target + request.url.path
)
return await proxy._convert_httpx_response_to_fastapi(proxied_response)
self.app.add_middleware(DefaultRouteMiddleware)
async def proxy(
self,
request: Request,
path: Optional[str] = None,
source_path: Optional[str] = None,
):
route_data = self.routes.get(source_path)
if not route_data:
return Response(status_code=404)
request = await route_data.on_request(request)
try:
proxied_response = await self._send_request(
request, route_data.session, url=f"{route_data.target_url}/{path}" if path else route_data.target_url
)
proxied_response = await self._convert_httpx_response_to_fastapi(proxied_response)
except Exception as e:
await route_data.on_error(request, e)
raise
return await route_data.on_response(proxied_response, request)
async def _send_request(self, request, session, url):
if not self.enable_streaming:
proxied_response = await session.request(
method=request.method,
url=url,
headers=dict(request.headers),
content=await request.body(),
params=request.query_params
)
else:
request = session.build_request(
method=request.method,
url=url,
content=request.stream(),
params=request.query_params,
headers=dict(request.headers),
timeout=httpx.USE_CLIENT_DEFAULT
)
proxied_response = await session.send(
request=request,
auth=httpx.USE_CLIENT_DEFAULT,
follow_redirects=httpx.USE_CLIENT_DEFAULT,
stream=True,
)
return proxied_response
async def _convert_httpx_response_to_fastapi(self, httpx_response):
if self.enable_streaming and httpx_response.headers.get("transfer-encoding", "").lower() == "chunked":
async def upstream_body_generator():
async for chunk in httpx_response.aiter_bytes():
yield chunk
return StreamingResponse(
upstream_body_generator(), status_code=httpx_response.status_code, headers=dict(httpx_response.headers)
)
if not self.enable_streaming:
content = httpx_response.content
else:
content = await httpx_response.aread()
fastapi_response = Response(
content=content,
status_code=httpx_response.status_code,
media_type=httpx_response.headers.get("content-type", None),
headers=dict(httpx_response.headers),
)
# should delete content-length when not present in the original response
# relevant for:
# https://datatracker.ietf.org/doc/html/rfc9112#body.content-length:~:text=MUST%20NOT%20send%20a%20Content%2DLength%20header
if httpx_response.headers.get("content-length") is None:
try:
del fastapi_response.headers["content-length"] # no pop available
except Exception:
pass
return fastapi_response
def add_route(
self,
source,
target,
request_callback=None,
response_callback=None,
error_callback=None,
endpoint_telemetry=True,
):
if not self._in_subprocess:
self.message_queue.put(
{
"method": "add_route",
"kwargs": {
"source": source,
"target": target,
"request_callback": request_callback,
"response_callback": response_callback,
"error_callback": error_callback,
"endpoint_telemetry": endpoint_telemetry,
},
}
)
return
should_add_route = False
if source not in self.routes:
should_add_route = True
else:
self.routes[source].stop_endpoint_telemetry()
self.routes[source] = Route(
target,
request_callback=request_callback,
response_callback=response_callback,
error_callback=error_callback,
session=httpx.AsyncClient(timeout=None),
)
if endpoint_telemetry is True:
endpoint_telemetry = {}
if endpoint_telemetry is not False:
self.routes[source].set_endpoint_telemetry_args(**endpoint_telemetry)
if self._in_subprocess:
self.routes[source].start_endpoint_telemetry()
if should_add_route:
self.app.add_api_route(
source,
functools.partial(
self.proxy,
source_path=source,
),
methods=self.ALL_REST_METHODS,
)
self.app.add_api_route(
source.rstrip("/") + "/{path:path}",
functools.partial(
self.proxy,
source_path=source,
),
methods=self.ALL_REST_METHODS,
)
return self.routes[source]
def remove_route(self, source):
if not self._in_subprocess:
self.message_queue.put({"method": "remove_route", "kwargs": {"source": source}})
return
route = self.routes.get(source)
if route:
route.stop_endpoint_telemetry()
if source in self.routes:
# we are not popping the key to prevent calling self.app.add_api_route multiple times
# when self.add_route is called on the same source_path after removal
self.routes[source] = None
def _start(self):
self._in_subprocess = True
self.app = FastAPI()
if self._default_target:
self._default_session = httpx.AsyncClient(timeout=None)
self._create_default_route()
for route in self.routes.values():
route.start_endpoint_telemetry()
threading.Thread(target=self._rpc_manager, daemon=True).start()
uvicorn.run(
self.app,
port=self.port,
host="0.0.0.0",
workers=self.workers,
log_level=self.log_level,
access_log=self.access_log,
)
def _rpc_manager(self):
while True:
message = self.message_queue.get()
if message["method"] == "add_route":
self.add_route(**message["kwargs"])
elif message["method"] == "remove_route":
self.remove_route(**message["kwargs"])
def start(self):
self.uvicorn_subprocess = Process(target=self._start)
self.uvicorn_subprocess.start()
def stop(self):
if self.uvicorn_subprocess:
self.uvicorn_subprocess.terminate()
self.uvicorn_subprocess = None

53
clearml/router/proxy.py Normal file
View File

@@ -0,0 +1,53 @@
from .fastapi_proxy import FastAPIProxy
class HttpProxy:
DEFAULT_PORT = 9000
def __init__(
self, port=None, workers=None, default_target=None, log_level=None, access_log=True, enable_streaming=True
):
# at the moment, only a fastapi proxy is supported
self.base_proxy = FastAPIProxy(
port or self.DEFAULT_PORT,
workers=workers,
default_target=default_target,
log_level=log_level,
access_log=access_log,
enable_streaming=enable_streaming,
)
self.base_proxy.start()
self.port = port
self.routes = {}
def add_route(
self,
source,
target,
request_callback=None,
response_callback=None,
endpoint_telemetry=True,
error_callback=None,
):
self.routes[source] = self.base_proxy.add_route(
source=source,
target=target,
request_callback=request_callback,
response_callback=response_callback,
endpoint_telemetry=endpoint_telemetry,
error_callback=error_callback,
)
return self.routes[source]
def remove_route(self, source):
self.routes.pop(source, None)
self.base_proxy.remove_route(source)
def get_routes(self):
return self.routes
def start(self):
self.base_proxy.start()
def stop(self):
self.base_proxy.stop()

93
clearml/router/route.py Normal file
View File

@@ -0,0 +1,93 @@
import inspect
from .endpoint_telemetry import EndpointTelemetry
class Route:
def __init__(self, target_url, request_callback=None, response_callback=None, session=None, error_callback=None):
self.target_url = target_url
self.request_callback = request_callback
self.response_callback = response_callback
self.error_callback = error_callback
self.session = session
self.persistent_state = {}
self._endpoint_telemetry = None
self._endpoint_telemetry_args = None
def set_endpoint_telemetry_args(
self,
endpoint_name="endpoint",
model_name="model",
model=None,
model_url=None,
model_source=None,
model_version=None,
app_id=None,
app_instance=None,
tags=None,
system_tags=None,
container_id=None,
input_size=None,
input_type="str",
report_statistics=True,
endpoint_url=None,
preprocess_artifact=None,
force_register=False
):
self._endpoint_telemetry_args = dict(
endpoint_name=endpoint_name,
model_name=model_name,
model=model,
model_url=model_url,
model_source=model_source,
model_version=model_version,
app_id=app_id,
app_instance=app_instance,
tags=tags,
system_tags=system_tags,
container_id=container_id,
input_size=input_size,
input_type=input_type,
report_statistics=report_statistics,
endpoint_url=endpoint_url,
preprocess_artifact=preprocess_artifact,
force_register=force_register
)
def start_endpoint_telemetry(self):
if self._endpoint_telemetry is not None or self._endpoint_telemetry_args is None:
return
self._endpoint_telemetry = EndpointTelemetry(**self._endpoint_telemetry_args)
def stop_endpoint_telemetry(self):
if self._endpoint_telemetry is None:
return
self._endpoint_telemetry.stop()
self._endpoint_telemetry = None
async def on_request(self, request):
new_request = request
if self.request_callback:
new_request = self.request_callback(request, persistent_state=self.persistent_state) or request
if inspect.isawaitable(new_request):
new_request = (await new_request) or request
if self._endpoint_telemetry:
self._endpoint_telemetry.on_request()
return new_request
async def on_response(self, response, request):
new_response = response
if self.response_callback:
new_response = self.response_callback(response, request, persistent_state=self.persistent_state) or response
if inspect.isawaitable(new_response):
new_response = (await new_response) or response
if self._endpoint_telemetry:
self._endpoint_telemetry.on_response()
return new_response
async def on_error(self, request, error):
on_error_result = None
if self.error_callback:
on_error_result = self.error_callback(request, error, persistent_state=self.persistent_state)
if inspect.isawaitable(on_error_result):
await on_error_result
return on_error_result

220
clearml/router/router.py Normal file
View File

@@ -0,0 +1,220 @@
from typing import Optional, Callable, Dict, Union # noqa
from fastapi import Request, Response # noqa
from .proxy import HttpProxy
class HttpRouter:
"""
A router class to manage HTTP routing for an application.
Allows the creation, deployment, and management of local and external endpoints,
as well as the configuration of a local proxy for traffic routing.
Example usage:
.. code-block:: py
def request_callback(request, persistent_state):
persistent_state["last_request_time"] = time.time()
def response_callback(response, request, persistent_state):
print("Latency:", time.time() - persistent_state["last_request_time"])
if urllib.parse.urlparse(str(request.url).rstrip("/")).path == "/modify":
new_content = response.body.replace(b"modify", b"modified")
headers = copy.deepcopy(response.headers)
headers["Content-Length"] = str(len(new_content))
return Response(status_code=response.status_code, headers=headers, content=new_content)
router = Task.current_task().get_http_router()
router.set_local_proxy_parameters(incoming_port=9000)
router.create_local_route(
source="/",
target="http://localhost:8000",
request_callback=request_callback,
response_callback=response_callback,
endpoint_telemetry={"model": "MyModel"}
)
router.deploy(wait=True)
"""
_instance = None
def __init__(self, task):
"""
Do not use directly. Use `Task.get_router` instead
"""
self._task = task
self._external_endpoint_port = None
self._proxy = None
self._proxy_params = {"port": HttpProxy.DEFAULT_PORT, "access_log": True}
def set_local_proxy_parameters(
self, incoming_port=None, default_target=None, log_level=None, access_log=True, enable_streaming=True
):
# type: (Optional[int], Optional[str], Optional[str], bool) -> ()
"""
Set the parameters with which the local proxy is initialized
:param incoming_port: The incoming port of the proxy
:param default_target: If None, no default target is set. Otherwise, route all traffic
that doesn't match a local route created via `create_local_route` to this target
:param log_level: Python log level for the proxy, one of:
'critical', 'error', 'warning', 'info', 'debug', 'trace'
:param access_log: Enable/Disable access log
:param enable_streaming: If True, enable streaming of responses with the `transfer-encoding` header set.
If False, no response will be streamed
"""
self._proxy_params["port"] = incoming_port or HttpProxy.DEFAULT_PORT
self._proxy_params["default_target"] = default_target
self._proxy_params["log_level"] = log_level
self._proxy_params["access_log"] = access_log
self._proxy_params["enable_streaming"] = enable_streaming
def start_local_proxy(self):
"""
Start the local proxy without deploying the router, i.e. requesting an external endpoint
"""
self._proxy = self._proxy or HttpProxy(**self._proxy_params)
def create_local_route(
self,
source, # type: str
target, # type: str
request_callback=None, # type: Callable[Request, Dict]
response_callback=None, # type: Callable[Response, Request, Dict]
endpoint_telemetry=True, # type: Union[bool, Dict]
error_callback=None, # type: Callable[Request, Exception, Dict]
):
"""
Create a local route from a source to a target through a proxy. If no proxy instance
exists, one is automatically created.
This function enables routing traffic between the source and target endpoints, optionally
allowing custom callbacks to handle requests and responses or to gather telemetry data
at the endpoint.
To customize proxy parameters, use the `Router.set_local_proxy_parameters` method.
By default, the proxy binds to port 9000 for incoming requests.
:param source: The source path for routing the traffic. For example, `/` will intercept
all the traffic sent to the proxy, while `/example` will only intercept the calls
that have `/example` as the path prefix.
:param target: The target URL where the intercepted traffic is routed.
:param request_callback: A function used to process each request before it is forwarded to the target.
The callback must have the following parameters:
- request - The intercepted FastAPI request
- persistent_state - A dictionary meant to be used as a caching utility object.
Shared with `response_callback` and `error_callback`
The callback can return a FastAPI Request, in which case this request will be forwarded to the target
:param response_callback: A function used to process each response before it is returned by the proxy.
The callback must have the following parameters:
- response - The FastAPI response
- request - The FastAPI request (after being preprocessed by the proxy)
- persistent_state - A dictionary meant to be used as a caching utility object.
Shared with `request_callback` and `error_callback`
The callback can return a FastAPI Response, in which case this response will be forwarded
:param endpoint_telemetry: If True, enable endpoint telemetry. If False, disable it.
If a dictionary is passed, enable endpoint telemetry with custom parameters.
The parameters are:
- endpoint_url - URL to the endpoint, mandatory if no external URL has been requested
- endpoint_name - name of the endpoint
- model_name - name of the model served by the endpoint
- model - referenced model
- model_url - URL to the model
- model_source - Source of the model
- model_version - Model version
- app_id - App ID, if used inside a ClearML app
- app_instance - The ID of the instance the ClearML app is running
- tags - ClearML tags
- system_tags - ClearML system tags
- container_id - Container ID, should be unique
- input_size - input size of the model
- input_type - input type expected by the model/endpoint
- report_statistics - whether or not to report statistics
:param error_callback: Callback to be called on request error.
The callback must have the following parameters:
- request - the FastAPI request which caused the error
- error - an exception which indicates which error occurred
- persistent_state - A dictionary meant to be used as a caching utility object.
Shared with `request_callback` and `response_callback`
"""
self.start_local_proxy()
self._proxy.add_route(
source,
target,
request_callback=request_callback,
response_callback=response_callback,
endpoint_telemetry=endpoint_telemetry,
error_callback=error_callback,
)
def remove_local_route(self, source):
# type: (str) -> ()
"""
Remove a local route. If endpoint telemetry is enabled for that route, disable it
:param source: Remove route based on the source path used to route the traffic
"""
if self._proxy:
self._proxy.remove_route(source)
def deploy(self, wait=False, wait_interval_seconds=3.0, wait_timeout_seconds=90.0):
# type: (Optional[int], str, bool, float, float) -> Optional[Dict]
"""
Start the local HTTP proxy and request an external endpoint for an application
:param port: Port the application is listening to. If no port is supplied, a local proxy
will be created. To control the proxy parameters, use `Router.set_local_proxy_parameters`.
To control create local routes through the proxy, use `Router.create_local_route`.
By default, the incoming port bound by the proxy is 9000
:param protocol: As of now, only `http` is supported
:param wait: If True, wait for the endpoint to be assigned
:param wait_interval_seconds: The poll frequency when waiting for the endpoint
:param wait_timeout_seconds: If this timeout is exceeded while waiting for the endpoint,
the method will no longer wait and None will be returned
:return: If wait is False, this method will return None.
If no endpoint could be found while waiting, this mehtod returns None.
Otherwise, it returns a dictionary containing the following values:
- endpoint - raw endpoint. One might need to authenticate in order to use this endpoint
- browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser
- port - the port exposed by the application
- protocol - the protocol used by the endpo"int
"""
self._proxy = self._proxy or HttpProxy(**self._proxy_params)
return self._task.request_external_endpoint(
port=self._proxy.port,
protocol="http",
wait=wait,
wait_interval_seconds=wait_interval_seconds,
wait_timeout_seconds=wait_timeout_seconds,
)
def wait_for_external_endpoint(self, wait_interval_seconds=3.0, wait_timeout_seconds=90.0):
# type: (float) -> Optional[Dict]
"""
Wait for an external endpoint to be assigned
:param wait_interval_seconds: The poll frequency when waiting for the endpoint
:param wait_timeout_seconds: If this timeout is exceeded while waiting for the endpoint,
the method will no longer wait
:return: If no endpoint could be found while waiting, this mehtod returns None.
Otherwise, it returns a dictionary containing the following values:
- endpoint - raw endpoint. One might need to authenticate in order to use this endpoint
- browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser
- port - the port exposed by the application
- protocol - the protocol used by the endpoint
"""
return self._task.wait_for_external_endpoint(
protocol="http", wait_interval_seconds=wait_interval_seconds, wait_timeout_seconds=wait_timeout_seconds
)
def list_external_endpoints(self):
# type: () -> List[Dict]
"""
List all external endpoints assigned
:return: A list of dictionaries. Each dictionary contains the following values:
- endpoint - raw endpoint. One might need to authenticate in order to use this endpoint
- browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser
- port - the port exposed by the application
- protocol - the protocol used by the endpoint
"""
return self._task.list_external_endpoints(protocol="http")

View File

@@ -20,9 +20,7 @@ from ..utilities.files import get_filename_max_length
class CacheManager(object):
__cache_managers = {}
_default_cache_file_limit = deferred_config(
"storage.cache.default_cache_manager_size", 100
)
_default_cache_file_limit = deferred_config("storage.cache.default_cache_manager_size", 100)
_storage_manager_folder = "storage_manager"
_default_context = "global"
_local_to_remote_url_lookup = OrderedDict()
@@ -48,9 +46,7 @@ class CacheManager(object):
self._file_limit = max(self._file_limit, int(cache_file_limit))
return self._file_limit
def get_local_copy(
self, remote_url, force_download, skip_zero_size_check=False
):
def get_local_copy(self, remote_url, force_download, skip_zero_size_check=False):
# type: (str, bool, bool) -> Optional[str]
helper = StorageHelper.get(remote_url)
@@ -64,9 +60,7 @@ class CacheManager(object):
# noinspection PyProtectedMember
direct_access = helper.get_driver_direct_access(remote_url)
except (OSError, ValueError):
LoggerRoot.get_base_logger().debug(
"Failed accessing local file: {}".format(remote_url)
)
LoggerRoot.get_base_logger().debug("Failed accessing local file: {}".format(remote_url))
return None
if direct_access:
@@ -77,6 +71,8 @@ class CacheManager(object):
if cached_size is not None and not force_download:
CacheManager._add_remote_url(remote_url, cached_file)
return cached_file
self.clean_cache()
# we need to download the file:
downloaded_file = helper.download_to_file(
remote_url,
@@ -131,12 +127,12 @@ class CacheManager(object):
file_ext = "".join(Path(file_name).suffixes[-2:])
file_ext = file_ext.rstrip(" ")
file_basename = file_name[:-len(file_ext)]
file_basename = file_name[: -len(file_ext)]
file_basename = file_basename.strip()
# Omit characters from extensionss
if len(file_ext) > allowed_length:
file_ext = file_ext[-(allowed_length - 1):]
file_ext = file_ext[-(allowed_length - 1) :]
file_ext = "." + file_ext.lstrip(".")
# Updating maximum character length
@@ -159,9 +155,7 @@ class CacheManager(object):
"""
:return: full path to current contexts cache folder
"""
folder = Path(
get_cache_dir() / CacheManager._storage_manager_folder / self._context
)
folder = Path(get_cache_dir() / CacheManager._storage_manager_folder / self._context)
return folder.as_posix()
def get_cache_file(self, remote_url=None, local_filename=None):
@@ -171,32 +165,7 @@ class CacheManager(object):
:param local_filename: if local_file is given, search for the local file/directory in the cache folder
:return: full path to file name, current file size or None
"""
def safe_time(x):
# noinspection PyBroadException
try:
return x.stat().st_mtime
except Exception:
return 0
def sort_max_access_time(x):
atime = safe_time(x)
# noinspection PyBroadException
try:
if x.is_dir():
dir_files = list(x.iterdir())
atime = (
max(atime, max(safe_time(s) for s in dir_files))
if dir_files
else atime
)
except Exception:
pass
return atime
folder = Path(
get_cache_dir() / CacheManager._storage_manager_folder / self._context
)
folder = Path(get_cache_dir() / CacheManager._storage_manager_folder / self._context)
folder.mkdir(parents=True, exist_ok=True)
local_filename = local_filename or self.get_hashed_url_file(remote_url)
local_filename = self._conform_filename(local_filename)
@@ -215,23 +184,45 @@ class CacheManager(object):
except Exception:
new_file_size = None
return new_file.as_posix(), new_file_size
def clean_cache(self):
# type: () -> bool
"""
If cache is full, clean it by deleting old/lock files
:return: True if the cache has been cleaned and False otherwise
"""
def safe_time(x):
# noinspection PyBroadException
try:
return x.stat().st_mtime
except Exception:
return 0
def sort_max_access_time(x):
atime = safe_time(x)
# noinspection PyBroadException
try:
if x.is_dir():
dir_files = list(x.iterdir())
atime = max(atime, max(safe_time(s) for s in dir_files)) if dir_files else atime
except Exception:
pass
return atime
folder = Path(get_cache_dir() / CacheManager._storage_manager_folder / self._context)
folder_files = list(folder.iterdir())
if len(folder_files) <= self._file_limit:
return new_file.as_posix(), new_file_size
return False
# first exclude lock files
lock_files = dict()
files = []
for f in sorted(folder_files, reverse=True, key=sort_max_access_time):
if f.name.startswith(CacheManager._lockfile_prefix) and f.name.endswith(
CacheManager._lockfile_suffix
):
if f.name.startswith(CacheManager._lockfile_prefix) and f.name.endswith(CacheManager._lockfile_suffix):
# parse the lock filename
name = f.name[
len(CacheManager._lockfile_prefix):-len(
CacheManager._lockfile_suffix
)
]
name = f.name[len(CacheManager._lockfile_prefix) : -len(CacheManager._lockfile_suffix)]
num, _, name = name.partition(".")
lock_files[name] = lock_files.get(name, []) + [f.as_posix()]
else:
@@ -242,7 +233,7 @@ class CacheManager(object):
lock_files.pop(f.name, None)
# delete old files
files = files[self._file_limit:]
files = files[self._file_limit :]
for f in files:
# check if the file is in the lock folder list:
folder_lock = self._folder_locks.get(f.absolute().as_posix())
@@ -279,9 +270,7 @@ class CacheManager(object):
shutil.rmtree(f.as_posix(), ignore_errors=False)
except Exception as e:
# failed deleting folder
LoggerRoot.get_base_logger().debug(
"Exception {}\nFailed deleting folder {}".format(e, f)
)
LoggerRoot.get_base_logger().debug("Exception {}\nFailed deleting folder {}".format(e, f))
# cleanup old lock files
for lock_files in lock_files.values():
@@ -291,8 +280,7 @@ class CacheManager(object):
os.unlink(f)
except BaseException:
pass
return new_file.as_posix(), new_file_size
return True
def lock_cache_folder(self, local_path):
# type: (Union[str, Path]) -> ()
@@ -382,9 +370,7 @@ class CacheManager(object):
except Exception:
return local_copy_path
return CacheManager._local_to_remote_url_lookup.get(
hash(conform_local_copy_path), local_copy_path
)
return CacheManager._local_to_remote_url_lookup.get(hash(conform_local_copy_path), local_copy_path)
@staticmethod
def _add_remote_url(remote_url, local_copy_path):
@@ -411,10 +397,7 @@ class CacheManager(object):
pass
CacheManager._local_to_remote_url_lookup[hash(local_copy_path)] = remote_url
# protect against overuse, so we do not blowup the memory
if (
len(CacheManager._local_to_remote_url_lookup)
> CacheManager.__local_to_remote_url_lookup_max_size
):
if len(CacheManager._local_to_remote_url_lookup) > CacheManager.__local_to_remote_url_lookup_max_size:
# pop the first item (FIFO)
CacheManager._local_to_remote_url_lookup.popitem(last=False)
@@ -429,6 +412,4 @@ class CacheManager(object):
# type: (Optional[str]) -> str
if not context:
return cls._default_context_folder_template
return cls._context_to_folder_lookup.get(
str(context), cls._default_context_folder_template
)
return cls._context_to_folder_lookup.get(str(context), cls._default_context_folder_template)

View File

@@ -113,6 +113,7 @@ if TYPE_CHECKING:
import pandas
import numpy
from PIL import Image
from .router.router import HttpRouter
# Forward declaration to help linters
TaskInstance = TypeVar("TaskInstance", bound="Task")
@@ -185,6 +186,11 @@ class Task(_Task):
_launch_multi_node_section = "launch_multi_node"
_launch_multi_node_instance_tag = "multi_node_instance"
_external_endpoint_port_map = {"http": "_PORT", "tcp": "external_tcp_port"}
_external_endpoint_address_map = {"http": "_ADDRESS", "tcp": "external_address"}
_external_endpoint_service_map = {"http": "EXTERNAL", "tcp": "EXTERNAL_TCP"}
_external_endpoint_internal_port_map = {"http": "_PORT", "tcp": "upstream_task_port"}
class _ConnectedParametersType(object):
argparse = "argument_parser"
dictionary = "dictionary"
@@ -218,6 +224,8 @@ class Task(_Task):
self._resource_monitor = None
self._calling_filename = None
self._remote_functions_generated = {}
self._external_endpoint_ports = {}
self._http_router = None
# register atexit, so that we mark the task as stopped
self._at_exit_called = False
@@ -842,6 +850,49 @@ class Task(_Task):
task._set_startup_info()
return task
def get_http_router(self):
# type: () -> HttpRouter
"""
Retrieve an instance of `HttpRouter` to manage an external HTTP endpoint and intercept traffic.
The `HttpRouter` serves as a traffic manager, enabling the creation and configuration of local and external
routesto redirect, monitor, or manipulate HTTP requests and responses. It is designed to handle routing
needs such via a proxy setup which handles request/response interception and telemetry reporting for
applications that require HTTP endpoint management.
Example usage:
.. code-block:: py
def request_callback(request, persistent_state):
persistent_state["last_request_time"] = time.time()
def response_callback(response, request, persistent_state):
print("Latency:", time.time() - persistent_state["last_request_time"])
if urllib.parse.urlparse(str(request.url).rstrip("/")).path == "/modify":
new_content = response.body.replace(b"modify", b"modified")
headers = copy.deepcopy(response.headers)
headers["Content-Length"] = str(len(new_content))
return Response(status_code=response.status_code, headers=headers, content=new_content)
router = Task.current_task().get_http_router()
router.set_local_proxy_parameters(incoming_port=9000)
router.create_local_route(
source="/",
target="http://localhost:8000",
request_callback=request_callback,
response_callback=response_callback,
endpoint_telemetry={"model": "MyModel"}
)
router.deploy(wait=True)
"""
try:
from .router.router import HttpRouter # noqa
except ImportError:
raise UsageError("Could not import `HttpRouter`. Please run `pip install clearml[router]`")
if self._http_router is None:
self._http_router = HttpRouter(self)
return self._http_router
def request_external_endpoint(
self, port, protocol="http", wait=False, wait_interval_seconds=3.0, wait_timeout_seconds=90.0
):
@@ -850,14 +901,14 @@ class Task(_Task):
Request an external endpoint for an application
:param port: Port the application is listening to
:param protocol: As of now, only `http` is supported
:param protocol: `http` or `tcp`
:param wait: If True, wait for the endpoint to be assigned
:param wait_interval_seconds: The poll frequency when waiting for the endpoint
:param wait_timeout_seconds: If this timeout is exceeded while waiting for the endpoint,
the method will no longer wait and None will be returned
:return: If wait is False, this method will return None.
If no endpoint could be found while waiting, this mehtod returns None.
If no endpoint could be found while waiting, this method returns None.
Otherwise, it returns a dictionary containing the following values:
- endpoint - raw endpoint. One might need to authenticate in order to use this endpoint
- browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser
@@ -865,75 +916,151 @@ class Task(_Task):
- protocol - the protocol used by the endpoint
"""
Session.verify_feature_set("advanced")
if not getattr(self, "_external_endpoint_port", None):
if protocol not in self._external_endpoint_port_map.keys():
raise ValueError("Invalid protocol: {}".format(protocol))
# sync with router - get data from Task
if not self._external_endpoint_ports.get(protocol):
self.reload()
assigned_port = self._get_runtime_properties().get("_PORT")
if assigned_port:
self._external_endpoint_port = assigned_port
if getattr(self, "_external_endpoint_port", None):
if self._external_endpoint_port != port: # noqa
raise ValueError(
"Only one endpoint can be requested at the moment. Port already exposed is: {}".format(
self._external_endpoint_port
)
internal_port = self._get_runtime_properties().get(self._external_endpoint_internal_port_map[protocol])
if internal_port:
self._external_endpoint_ports[protocol] = internal_port
# check if we are trying to change the port - currently not allowed
if self._external_endpoint_ports.get(protocol):
if self._external_endpoint_ports.get(protocol) == port:
# we already set this endpoint, so do nothing
return
raise ValueError(
"Only one endpoint per protocol can be requested at the moment. Port already exposed is: {}".format(
self._external_endpoint_ports.get(protocol)
)
return
)
# mark for the router our request
# noinspection PyProtectedMember
self._set_runtime_properties(
{"_SERVICE": "EXTERNAL", "_ADDRESS": get_private_ip(), "_PORT": port}
{
"_SERVICE": self._external_endpoint_service_map[protocol],
self._external_endpoint_address_map[protocol]: get_private_ip(),
self._external_endpoint_port_map[protocol]: port,
}
)
# required system_tag for the router to catch the routing request
self.set_system_tags((self.get_system_tags() or []) + ["external_service"])
self._external_endpoint_port = port
self._external_endpoint_ports[protocol] = port
if wait:
return self.wait_for_external_endpoint(wait_interval_seconds=wait_interval_seconds)
return self.wait_for_external_endpoint(
wait_interval_seconds=wait_interval_seconds,
wait_timeout_seconds=wait_timeout_seconds,
protocol=protocol
)
return None
def wait_for_external_endpoint(self, wait_interval_seconds=3.0, wait_timeout_seconds=90.0):
# type: (float) -> Optional[Dict]
def wait_for_external_endpoint(self, wait_interval_seconds=3.0, wait_timeout_seconds=90.0, protocol="http"):
# type: (float, float, Optional[str]) -> Union[Optional[Dict], List[Optional[Dict]]]
"""
Wait for an external endpoint to be assigned
:param wait_interval_seconds: The poll frequency when waiting for the endpoint
:param wait_timeout_seconds: If this timeout is exceeded while waiting for the endpoint,
the method will no longer wait
:param protocol: `http` or `tcp`. Wait for an endpoint to be assigned based on the protocol.
If None, wait for all supported protocols
:return: If no endpoint could be found while waiting, this mehtod returns None.
Otherwise, it returns a dictionary containing the following values:
:return: If no endpoint could be found while waiting, this method returns None.
If a protocol has been specified, it returns a dictionary containing the following values:
- endpoint - raw endpoint. One might need to authenticate in order to use this endpoint
- browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser
- port - the port exposed by the application
- protocol - the protocol used by the endpoint
If not protocol is specified, it returns a list of dictionaries containing the values above,
for each protocol requested and waited
"""
Session.verify_feature_set("advanced")
if not getattr(self, "_external_endpoint_port", None):
LoggerRoot.get_base_logger().warning("No external endpoints have been requested")
if protocol:
return self._wait_for_external_endpoint(
wait_interval_seconds=wait_interval_seconds,
wait_timeout_seconds=wait_timeout_seconds,
protocol=protocol,
warn=True
)
results = []
protocols = ["http", "tcp"]
waited_protocols = []
for protocol_ in protocols:
start_time = time.time()
result = self._wait_for_external_endpoint(
wait_interval_seconds=wait_interval_seconds,
wait_timeout_seconds=wait_timeout_seconds,
protocol=protocol_,
warn=False,
)
elapsed = time.time() - start_time
if result:
results.append(result)
wait_timeout_seconds -= elapsed
if wait_timeout_seconds > 0 or result:
waited_protocols.append(protocol_)
unwaited_protocols = [p for p in protocols if p not in waited_protocols]
if wait_timeout_seconds <= 0 and unwaited_protocols:
LoggerRoot.get_base_logger().warning(
"Timeout exceeded while waiting for {} endpoint(s)".format(",".join(unwaited_protocols))
)
return results
def _wait_for_external_endpoint(
self, wait_interval_seconds=3.0, wait_timeout_seconds=90.0, protocol="http", warn=True
):
if not self._external_endpoint_ports.get(protocol):
self.reload()
internal_port = self._get_runtime_properties().get(self._external_endpoint_internal_port_map[protocol])
if internal_port:
self._external_endpoint_ports[protocol] = internal_port
if not self._external_endpoint_ports.get(protocol):
if warn:
LoggerRoot.get_base_logger().warning("No external {} endpoints have been requested".format(protocol))
return None
start_time = time.time()
while True:
self.reload()
# noinspection PyProtectedMember
runtime_props = self._get_runtime_properties()
endpoint = runtime_props.get("endpoint")
browser_endpoint = runtime_props.get("browser_endpoint")
if not getattr(self, "_external_endpoint_port", None):
self._external_endpoint_port = runtime_props.get("_PORT")
endpoint, browser_endpoint = None, None
if protocol == "http":
endpoint = runtime_props.get("endpoint")
browser_endpoint = runtime_props.get("browser_endpoint")
elif protocol == "tcp":
health_check = runtime_props.get("upstream_task_port")
if health_check:
endpoint = (
runtime_props.get(self._external_endpoint_address_map[protocol])
+ ":"
+ str(runtime_props.get(self._external_endpoint_port_map[protocol]))
)
if endpoint or browser_endpoint:
return {
"endpoint": endpoint,
"browser_endpoint": browser_endpoint,
"port": self._external_endpoint_port,
"protocol": "http",
"port": self._external_endpoint_ports[protocol],
"protocol": protocol,
}
if time.time() >= start_time + wait_timeout_seconds:
LoggerRoot.get_base_logger().warning("Timeout exceeded while waiting for endpoint")
if warn:
LoggerRoot.get_base_logger().warning(
"Timeout exceeded while waiting for {} endpoint".format(protocol)
)
return None
time.sleep(wait_interval_seconds)
def list_external_endpoints(self):
# type: () -> List[Dict]
def list_external_endpoints(self, protocol=None):
# type: (Optional[str]) -> List[Dict]
"""
List all external endpoints assigned
:param protocol: If None, list all external endpoints. Otherwise, only list endpoints
that use this protocol
:return: A list of dictionaries. Each dictionary contains the following values:
- endpoint - raw endpoint. One might need to authenticate in order to use this endpoint
- browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser
@@ -941,23 +1068,37 @@ class Task(_Task):
- protocol - the protocol used by the endpoint
"""
Session.verify_feature_set("advanced")
if not getattr(self, "_external_endpoint_port", None):
self.reload()
self._external_endpoint_port = self._get_runtime_properties().get("_PORT")
if not getattr(self, "_external_endpoint_port", None):
LoggerRoot.get_base_logger().warning("No external endpoints have been requested")
return []
runtime_props = self._get_runtime_properties()
endpoint = runtime_props.get("endpoint")
browser_endpoint = runtime_props.get("browser_endpoint")
return [
{
"endpoint": endpoint,
"browser_endpoint": browser_endpoint,
"port": self._external_endpoint_port,
"protocol": "http",
}
]
results = []
protocols = [protocol] if protocol is not None else ["http", "tcp"]
for protocol in protocols:
internal_port = runtime_props.get(self._external_endpoint_internal_port_map[protocol])
if internal_port:
self._external_endpoint_ports[protocol] = internal_port
else:
continue
endpoint, browser_endpoint = None, None
if protocol == "http":
endpoint = runtime_props.get("endpoint")
browser_endpoint = runtime_props.get("browser_endpoint")
elif protocol == "tcp":
health_check = runtime_props.get("upstream_task_port")
if health_check:
endpoint = (
runtime_props.get(self._external_endpoint_address_map[protocol])
+ ":"
+ str(runtime_props.get(self._external_endpoint_port_map[protocol]))
)
if endpoint or browser_endpoint:
results.append(
{
"endpoint": endpoint,
"browser_endpoint": browser_endpoint,
"port": internal_port,
"protocol": protocol,
}
)
return results
@classmethod
def create(
@@ -2267,6 +2408,26 @@ class Task(_Task):
# mark task as stopped
self.stopped(force=force, status_message=str(status_message) if status_message else None)
def mark_stop_request(self, force=False, status_message=None):
# type: (bool, Optional[str]) -> ()
"""
Request a task to stop. this will not change the task status
but mark a request for an agent or SDK to actually stop the Task.
This will trigger the Task's abort callback, and at the end will
change the task status to stopped and kill the Task's processes
Notice: calling this on your own Task, will cause
the watchdog to call the on_abort callback and kill the process
:param bool force: If not True, call fails if the task status is not 'in_progress'
:param str status_message: Optional, add status change message to the stop request.
This message will be stored as status_message on the Task's info panel
"""
# flush any outstanding logs
self.flush(wait_for_uploads=True)
# request task stop
return self.stop_request(self, force=force, status_message=status_message)
def flush(self, wait_for_uploads=False):
# type: (bool) -> bool
"""
@@ -2523,7 +2684,7 @@ class Task(_Task):
- PIL.Image - whatever extensions PIL supports (default ``.png``)
- In case the ``serialization_function`` argument is set - any extension is supported
:param Callable[Any, Union[bytes, bytearray]] serialization_function: A serialization function that takes one
:param serialization_function: A serialization function that takes one
parameter of any type which is the object to be serialized. The function should return
a `bytes` or `bytearray` object, which represents the serialized object. Note that the object will be
immediately serialized using this function, thus other serialization methods will not be used

View File

@@ -281,12 +281,16 @@ class SafeQueue(object):
# Fix the python Queue and Use SimpleQueue write so it uses a single OS write,
# making it atomic message passing
self._q = SimpleQueue(*args, **kwargs)
# noinspection PyBroadException
try:
# noinspection PyUnresolvedReferences,PyProtectedMember
self._q._writer._send_bytes = partial(SafeQueue._pipe_override_send_bytes, self._q._writer)
except Exception:
pass
# on Windows, queue communication is done via pipes, no need to override the _send_bytes method
if sys.platform != 'win32':
# noinspection PyBroadException
try:
# noinspection PyUnresolvedReferences,PyProtectedMember
self._q._writer._send_bytes = partial(SafeQueue._pipe_override_send_bytes, self._q._writer)
except Exception:
pass
self._internal_q = None
# Note we should Never! assign a new object to `self._q_size`, just work with the initial object
self._q_size = [] # list of PIDs we pushed, so this is atomic.

View File

@@ -1 +1 @@
__version__ = "1.16.5"
__version__ = "1.17.1"

View File

@@ -0,0 +1,42 @@
"""
Example on how to use the ClearML HTTP router.
For this example, you would first need a webserver to route the traffic to:
`simple_webserver.py` launches such a server. Running the script will start a
webserver, bound to localhost:8000.
Then, when running this example, it creates a router which binds to 0.0.0.0:9000.
A local route is then created, which will proxy all traffic from
`http://<PRIVATE_IP>:9000/example_source` to `http://localhost:8000/serve`.
Traffic can be intercepted both on request and response via callbacks. See
`request_callback` and `response_callback`.
By default, the route traffic is monitored and telemetry is sent to the ClearML
server. To disable this, pass `endpoint_telemetry=False` when creating the route
"""
import time
from clearml import Task
def request_callback(request, persistent_state):
persistent_state["last_request_time"] = time.time()
def response_callback(response, request, persistent_state):
print("Latency:", time.time() - persistent_state["last_request_time"])
if __name__ == "__main__":
task = Task.init(project_name="Router Example", task_name="Router Example")
router = task.get_http_router()
router.set_local_proxy_parameters(incoming_port=9000, default_target="http://localhost:8000")
router.create_local_route(
source="/example_source",
target="http://localhost:8000/serve", # route traffic to this address
request_callback=request_callback, # intercept requests
response_callback=response_callback, # intercept responses
endpoint_telemetry={"model": "MyModel"} # set this to False to disable telemetry
)
router.deploy(wait=True)
# run `curl http://localhost:9000/example_source/1`

View File

@@ -0,0 +1,2 @@
clearml
clearml[router]

View File

@@ -0,0 +1,44 @@
"""
A simple webserver, used as a tool to showcase the capabilities of
ClearML HTTP router. See `http_router.py` for more details.
"""
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
app = FastAPI()
actions = {
"1": {"name": "Action 1", "description": "This is model action 1"},
"2": {"name": "Action 2", "description": "This is model action 2"},
"3": {"name": "Action 3", "description": "This is model action 3"},
}
class Item(BaseModel):
name: str
description: str
@app.get("/")
def read_root():
return {"message": "Welcome to the FastAPI application!"}
@app.get("/serve/{action}", response_model=Item)
def read_item(action: str):
if action in actions:
return actions[action]
else:
raise HTTPException(status_code=404, detail="Item not found")
if __name__ == "__main__":
uvicorn.run(
"simple_webserver:app",
host="127.0.0.1",
port=8000,
reload=True
)

View File

@@ -11,7 +11,7 @@ Pillow>=4.1.1
psutil>=3.4.2
pyparsing>=2.0.3
python-dateutil>=2.6.1
pyjwt>=2.4.0,<2.9.0 ; python_version > '3.5'
pyjwt>=2.4.0,<2.10.0 ; python_version > '3.5'
pyjwt>=1.6.4,<2.0.0 ; python_version <= '3.5'
PyYAML>=3.12
referencing<0.40 ; python_version >= '3.8'

View File

@@ -1,6 +1,6 @@
"""
ClearML - Artificial Intelligence Version Control
https://github.com/allegroai/clearml
ClearML Inc
https://github.com/clearml/clearml
"""
import os.path
@@ -21,8 +21,7 @@ long_description = read_text(os.path.join(here, 'README.md'))
# fix github, dark logo hack.
long_description = long_description.replace(
"""<img align="center" src="docs/clearml-logo.svg#gh-light-mode-only" alt="Clear|ML"><img align="center" src="docs/clearml-logo-dark.svg#gh-dark-mode-only" alt="Clear|ML">""", # noqa
"""<a href="https://app.clear.ml"><img src="https://github.com/allegroai/clearml/blob/master/docs/clearml-logo.svg?raw=true" width="250px"></a>""", # noqa
1
"""<a href="https://clear.ml"><img src="https://raw.githubusercontent.com/clearml/clearml/refs/heads/master/docs/clearml-logo.svg" width="250px"></a>""", # noqa
)
@@ -46,7 +45,7 @@ setup(
long_description=long_description,
long_description_content_type='text/markdown',
# The project's main homepage.
url='https://github.com/allegroai/clearml',
url='https://github.com/clearml/clearml',
author='ClearML',
author_email='support@clear.ml',
license='Apache License 2.0',
@@ -71,6 +70,7 @@ setup(
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12',
'Programming Language :: Python :: 3.13',
'License :: OSI Approved :: Apache Software License',
],
keywords='clearml trains development machine deep learning version control machine-learning machinelearning '
@@ -87,6 +87,11 @@ setup(
'gs': [
'google-cloud-storage>=1.13.2',
],
'router': [
'fastapi>=0.115.2',
'uvicorn>=0.31.1',
'httpx>=0.27.2'
]
},
package_data={
'clearml': ['config/default/*.conf', 'backend_api/config/default/*.conf']