diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md index ebbbf0cc..52de4877 100644 --- a/CODE_OF_CONDUCT.md +++ b/CODE_OF_CONDUCT.md @@ -55,7 +55,7 @@ a project may be further defined and clarified by project maintainers. ## Enforcement Instances of abusive, harassing, or otherwise unacceptable behavior may be -reported by contacting the project team at . All +reported by contacting the project team at . All complaints will be reviewed and investigated and will result in a response that is deemed necessary and appropriate to the circumstances. The project team is obligated to maintain confidentiality with regard to the reporter of an incident. diff --git a/LICENSE b/LICENSE index c499a151..97000805 100644 --- a/LICENSE +++ b/LICENSE @@ -186,7 +186,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright 2024 ClearML + Copyright 2025 ClearML Inc Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/README.md b/README.md index 4b7e9c3c..47a44609 100644 --- a/README.md +++ b/README.md @@ -7,8 +7,8 @@ **[ClearML](https://clear.ml) - Auto-Magical Suite of tools to streamline your AI workflow
Experiment Manager, MLOps/LLMOps and Data-Management** -[![GitHub license](https://img.shields.io/github/license/allegroai/clearml.svg)](https://img.shields.io/github/license/allegroai/clearml.svg) [![PyPI pyversions](https://img.shields.io/pypi/pyversions/clearml.svg)](https://img.shields.io/pypi/pyversions/clearml.svg) [![PyPI version shields.io](https://img.shields.io/pypi/v/clearml.svg)](https://pypi.org/project/clearml/) [![Conda version shields.io](https://img.shields.io/conda/v/clearml/clearml)](https://anaconda.org/clearml/clearml) [![Optuna](https://img.shields.io/badge/Optuna-integrated-blue)](https://optuna.org)
-[![PyPI Downloads](https://static.pepy.tech/badge/clearml/month)](https://pypi.org/project/clearml/) [![Artifact Hub](https://img.shields.io/endpoint?url=https://artifacthub.io/badge/repository/allegroai)](https://artifacthub.io/packages/search?repo=allegroai) [![Youtube](https://img.shields.io/badge/ClearML-DD0000?logo=youtube&logoColor=white)](https://www.youtube.com/c/clearml) [![Slack Channel](https://img.shields.io/badge/slack-%23clearml--community-blueviolet?logo=slack)](https://joinslack.clear.ml) [![Signup](https://img.shields.io/badge/Clear%7CML-Signup-brightgreen)](https://app.clear.ml) +[![GitHub license](https://img.shields.io/github/license/clearml/clearml.svg)](https://img.shields.io/github/license/clearml/clearml.svg) [![PyPI pyversions](https://img.shields.io/pypi/pyversions/clearml.svg)](https://img.shields.io/pypi/pyversions/clearml.svg) [![PyPI version shields.io](https://img.shields.io/pypi/v/clearml.svg)](https://pypi.org/project/clearml/) [![Conda version shields.io](https://img.shields.io/conda/v/clearml/clearml)](https://anaconda.org/clearml/clearml) [![Optuna](https://img.shields.io/badge/Optuna-integrated-blue)](https://optuna.org)
+[![PyPI Downloads](https://static.pepy.tech/badge/clearml/month)](https://pypi.org/project/clearml/) [![Artifact Hub](https://img.shields.io/endpoint?url=https://artifacthub.io/badge/repository/clearml)](https://artifacthub.io/packages/search?repo=clearml) [![Youtube](https://img.shields.io/badge/ClearML-DD0000?logo=youtube&logoColor=white)](https://www.youtube.com/c/clearml) [![Slack Channel](https://img.shields.io/badge/slack-%23clearml--community-blueviolet?logo=slack)](https://joinslack.clear.ml) [![Signup](https://img.shields.io/badge/Clear%7CML-Signup-brightgreen)](https://app.clear.ml) `๐ŸŒŸ ClearML is open-source - Leave a star to support the project! ๐ŸŒŸ` @@ -17,21 +17,20 @@ --- ### ClearML -*Formerly known as Allegro Trains* ClearML is a ML/DL development and production suite. It contains FIVE main modules: - [Experiment Manager](#clearml-experiment-manager) - Automagical experiment tracking, environments and results -- [MLOps / LLMOps](https://github.com/allegroai/clearml-agent) - Orchestration, Automation & Pipelines solution for ML/DL/GenAI jobs (Kubernetes / Cloud / bare-metal) -- [Data-Management](https://github.com/allegroai/clearml/blob/master/docs/datasets.md) - Fully differentiable data management & version control solution on top of object-storage +- [MLOps / LLMOps](https://github.com/clearml/clearml-agent) - Orchestration, Automation & Pipelines solution for ML/DL/GenAI jobs (Kubernetes / Cloud / bare-metal) +- [Data-Management](https://github.com/clearml/clearml/blob/master/docs/datasets.md) - Fully differentiable data management & version control solution on top of object-storage (S3 / GS / Azure / NAS) -- [Model-Serving](https://github.com/allegroai/clearml-serving) - *cloud-ready* Scalable model serving solution! +- [Model-Serving](https://github.com/clearml/clearml-serving) - *cloud-ready* Scalable model serving solution! - **Deploy new model endpoints in under 5 minutes** - Includes optimized GPU serving support backed by Nvidia-Triton - **with out-of-the-box Model Monitoring** - [Reports](https://clear.ml/docs/latest/docs/webapp/webapp_reports) - Create and share rich MarkDown documents supporting embeddable online content - :fire: [Orchestration Dashboard](https://clear.ml/docs/latest/docs/webapp/webapp_orchestration_dash/) - Live rich dashboard for your entire compute cluster (Cloud / Kubernetes / On-Prem) -- **NEW** ๐Ÿ’ฅ [Fractional GPUs](https://github.com/allegroai/clearml-fractional-gpu) - Container based, driver level GPU memory limitation ๐Ÿ™€ !!! +- **NEW** ๐Ÿ’ฅ [Fractional GPUs](https://github.com/clearml/clearml-fractional-gpu) - Container based, driver level GPU memory limitation ๐Ÿ™€ !!! Instrumenting these components is the **ClearML-server**, see [Self-Hosting](https://clear.ml/docs/latest/docs/deploying_clearml/clearml_server) & [Free tier Hosting](https://app.clear.ml) @@ -48,20 +47,20 @@ Instrumenting these components is the **ClearML-server**, see [Self-Hosting](htt - - + - - + - - + @@ -79,8 +78,8 @@ Instrumenting these components is the **ClearML-server**, see [Self-Hosting](htt - - + + @@ -90,8 +89,8 @@ Instrumenting these components is the **ClearML-server**, see [Self-Hosting](htt - - + +
Step 1 - Experiment Management + Step 1 - Experiment Management Open In Colab
Step 2 - Remote Execution Agent Setup + Step 2 - Remote Execution Agent Setup Open In Colab
Step 3 - Remotely Execute Tasks + Step 3 - Remotely Execute Tasks Open In Colab
Datasets
Pipelines
@@ -115,13 +114,13 @@ Instrumenting these components is the **ClearML-server**, see [Self-Hosting](htt * Resource Monitoring (CPU/GPU utilization, temperature, IO, network, etc.) * Model snapshots (With optional automatic upload to central storage: Shared folder, S3, GS, Azure, Http) * Artifacts log & store (Shared folder, S3, GS, Azure, Http) - * Tensorboard/[TensorboardX](https://github.com/allegroai/clearml/tree/master/examples/frameworks/tensorboardx) scalars, metrics, histograms, **images, audio and video samples** - * [Matplotlib & Seaborn](https://github.com/allegroai/clearml/tree/master/examples/frameworks/matplotlib) + * Tensorboard/[TensorboardX](https://github.com/clearml/clearml/tree/master/examples/frameworks/tensorboardx) scalars, metrics, histograms, **images, audio and video samples** + * [Matplotlib & Seaborn](https://github.com/clearml/clearml/tree/master/examples/frameworks/matplotlib) * [ClearML Logger](https://clear.ml/docs/latest/docs/fundamentals/logger) interface for complete flexibility. * Extensive platform support and integrations - * Supported ML/DL frameworks: [PyTorch](https://github.com/allegroai/clearml/tree/master/examples/frameworks/pytorch) (incl' [ignite](https://github.com/allegroai/clearml/tree/master/examples/frameworks/ignite) / [lightning](https://github.com/allegroai/clearml/tree/master/examples/frameworks/pytorch-lightning)), [Tensorflow](https://github.com/allegroai/clearml/tree/master/examples/frameworks/tensorflow), [Keras](https://github.com/allegroai/clearml/tree/master/examples/frameworks/keras), [AutoKeras](https://github.com/allegroai/clearml/tree/master/examples/frameworks/autokeras), [FastAI](https://github.com/allegroai/clearml/tree/master/examples/frameworks/fastai), [XGBoost](https://github.com/allegroai/clearml/tree/master/examples/frameworks/xgboost), [LightGBM](https://github.com/allegroai/clearml/tree/master/examples/frameworks/lightgbm), [MegEngine](https://github.com/allegroai/clearml/tree/master/examples/frameworks/megengine) and [Scikit-Learn](https://github.com/allegroai/clearml/tree/master/examples/frameworks/scikit-learn) + * Supported ML/DL frameworks: [PyTorch](https://github.com/clearml/clearml/tree/master/examples/frameworks/pytorch) (incl' [ignite](https://github.com/clearml/clearml/tree/master/examples/frameworks/ignite) / [lightning](https://github.com/clearml/clearml/tree/master/examples/frameworks/pytorch-lightning)), [Tensorflow](https://github.com/clearml/clearml/tree/master/examples/frameworks/tensorflow), [Keras](https://github.com/clearml/clearml/tree/master/examples/frameworks/keras), [AutoKeras](https://github.com/clearml/clearml/tree/master/examples/frameworks/autokeras), [FastAI](https://github.com/clearml/clearml/tree/master/examples/frameworks/fastai), [XGBoost](https://github.com/clearml/clearml/tree/master/examples/frameworks/xgboost), [LightGBM](https://github.com/clearml/clearml/tree/master/examples/frameworks/lightgbm), [MegEngine](https://github.com/clearml/clearml/tree/master/examples/frameworks/megengine) and [Scikit-Learn](https://github.com/clearml/clearml/tree/master/examples/frameworks/scikit-learn) * Seamless integration (including version control) with [**Jupyter Notebook**](https://jupyter.org/) - and [*PyCharm* remote debugging](https://github.com/allegroai/trains-pycharm-plugin) + and [*PyCharm* remote debugging](https://github.com/clearml/trains-pycharm-plugin) #### [Start using ClearML](https://clear.ml/docs/latest/docs/getting_started/ds/ds_first_steps) @@ -163,13 +162,13 @@ The ClearML run-time components: * The ClearML Server - for storing experiment, model, and workflow data; supporting the Web UI experiment manager and MLOps automation for reproducibility and tuning. It is available as a hosted service and open source for you to deploy your own ClearML Server. * The ClearML Agent - for MLOps orchestration, experiment and workflow reproducibility, and scalability. -clearml-architecture +clearml-architecture ## Additional Modules -- [clearml-session](https://github.com/allegroai/clearml-session) - **Launch remote JupyterLab / VSCode-server inside any docker, on Cloud/On-Prem machines** -- [clearml-task](https://github.com/allegroai/clearml/blob/master/docs/clearml-task.md) - Run any codebase on remote machines with full remote logging of Tensorboard, Matplotlib & Console outputs -- [clearml-data](https://github.com/allegroai/clearml/blob/master/docs/datasets.md) - **CLI for managing and versioning your datasets, including creating / uploading / downloading of data from S3/GS/Azure/NAS** +- [clearml-session](https://github.com/clearml/clearml-session) - **Launch remote JupyterLab / VSCode-server inside any docker, on Cloud/On-Prem machines** +- [clearml-task](https://github.com/clearml/clearml/blob/master/docs/clearml-task.md) - Run any codebase on remote machines with full remote logging of Tensorboard, Matplotlib & Console outputs +- [clearml-data](https://github.com/clearml/clearml/blob/master/docs/datasets.md) - **CLI for managing and versioning your datasets, including creating / uploading / downloading of data from S3/GS/Azure/NAS** - [AWS Auto-Scaler](https://clear.ml/docs/latest/docs/guides/services/aws_autoscaler) - Automatically spin EC2 instances based on your workloads with preconfigured budget! No need for AKE! - [Hyper-Parameter Optimization](https://clear.ml/docs/latest/docs/guides/optimization/hyper-parameter-optimization/examples_hyperparam_opt) - Optimize any code with black-box approach and state-of-the-art Bayesian optimization algorithms - [Automation Pipeline](https://clear.ml/docs/latest/docs/guides/pipeline/pipeline_controller) - Build pipelines based on existing experiments / jobs, supports building pipelines of pipelines! @@ -212,7 +211,7 @@ If ClearML is part of your development process / project / publication, please c @misc{clearml, title = {ClearML - Your entire MLOps stack in one open-source tool}, year = {2024}, -note = {Software available from http://github.com/allegroai/clearml}, +note = {Software available from http://github.com/clearml/clearml}, url={https://clear.ml/}, author = {ClearML}, } @@ -222,17 +221,17 @@ author = {ClearML}, For more information, see the [official documentation](https://clear.ml/docs) and [on YouTube](https://www.youtube.com/c/ClearML). -For examples and use cases, check the [examples folder](https://github.com/allegroai/clearml/tree/master/examples) and [corresponding documentation](https://clear.ml/docs/latest/docs/guides). +For examples and use cases, check the [examples folder](https://github.com/clearml/clearml/tree/master/examples) and [corresponding documentation](https://clear.ml/docs/latest/docs/guides). If you have any questions: post on our [Slack Channel](https://joinslack.clear.ml), or tag your questions on [stackoverflow](https://stackoverflow.com/questions/tagged/clearml) with '**[clearml](https://stackoverflow.com/questions/tagged/clearml)**' tag (*previously [trains](https://stackoverflow.com/questions/tagged/trains) tag*). -For feature requests or bug reports, please use [GitHub issues](https://github.com/allegroai/clearml/issues). +For feature requests or bug reports, please use [GitHub issues](https://github.com/clearml/clearml/issues). Additionally, you can always find us at *info@clear.ml* ## Contributing -**PRs are always welcome** :heart: See more details in the ClearML [Guidelines for Contributing](https://github.com/allegroai/clearml/blob/master/docs/contributing.md). +**PRs are always welcome** :heart: See more details in the ClearML [Guidelines for Contributing](https://github.com/clearml/clearml/blob/master/docs/contributing.md). _May the force (and the goddess of learning rates) be with you!_ diff --git a/clearml/automation/auto_scaler.py b/clearml/automation/auto_scaler.py index 5e7c4cb0..bd3c56a1 100644 --- a/clearml/automation/auto_scaler.py +++ b/clearml/automation/auto_scaler.py @@ -15,7 +15,7 @@ from ..backend_api.session import defs from ..backend_api.session.client import APIClient from ..debugging import get_logger -# Worker's id in clearml would be composed from prefix, name, instance_type and cloud_id separated by ":" +# Worker's id in clearml would be composed of prefix, name, instance_type and cloud_id separated by ":" # Example: 'test:m1:g4dn.4xlarge:i-07cf7d6750455cb62' # cloud_id might be missing diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index 08612282..83d2d5db 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -59,6 +59,7 @@ class PipelineController(object): _update_execution_plot_interval = 5.*60 _update_progress_interval = 10. _monitor_node_interval = 5.*60 + _pipeline_as_sub_project_cached = None _report_plot_execution_flow = dict(title='Pipeline', series='Execution Flow') _report_plot_execution_details = dict(title='Pipeline Details', series='Execution Details') _evaluated_return_values = {} # TID: pipeline_name @@ -221,7 +222,8 @@ class PipelineController(object): artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]] output_uri=None, # type: Optional[Union[str, bool]] skip_global_imports=False, # type: bool - working_dir=None # type: Optional[str] + working_dir=None, # type: Optional[str] + enable_local_imports=True # type: bool ): # type: (...) -> None """ @@ -316,6 +318,11 @@ class PipelineController(object): the steps from a functions, otherwise all global imports will be automatically imported in a safe manner at the beginning of each stepโ€™s execution. Default is False :param working_dir: Working directory to launch the pipeline from. + :param enable_local_imports: If True, allow pipeline steps to import from local files + by appending to the PYTHONPATH of each step the directory the pipeline controller + script resides in (sys.path[0]). + If False, the directory won't be appended to PYTHONPATH. Default is True. + Ignored while running remotely. """ if auto_version_bump is not None: warnings.warn("PipelineController.auto_version_bump is deprecated. It will be ignored", DeprecationWarning) @@ -328,7 +335,7 @@ class PipelineController(object): self._version = str(version).strip() if version else None if self._version and not Version.is_valid_version_string(self._version): raise ValueError( - "Setting non-semantic dataset version '{}'".format(self._version) + "Setting non-semantic pipeline version '{}'".format(self._version) ) self._pool_frequency = pool_frequency * 60. self._thread = None @@ -348,19 +355,13 @@ class PipelineController(object): self._reporting_lock = RLock() self._pipeline_task_status_failed = None self._mock_execution = False # used for nested pipelines (eager execution) - self._pipeline_as_sub_project = bool(Session.check_min_api_server_version("2.17")) self._last_progress_update_time = 0 self._artifact_serialization_function = artifact_serialization_function self._artifact_deserialization_function = artifact_deserialization_function self._skip_global_imports = skip_global_imports + self._enable_local_imports = enable_local_imports if not self._task: - task_name = name or project or '{}'.format(datetime.now()) - if self._pipeline_as_sub_project: - parent_project = (project + "/" if project else "") + self._project_section - project_name = "{}/{}".format(parent_project, task_name) - else: - parent_project = None - project_name = project or 'Pipelines' + pipeline_project_args = self._create_pipeline_project_args(name, project) # if user disabled the auto-repo, we force local script storage (repo="" or repo=False) set_force_local_repo = False @@ -369,8 +370,8 @@ class PipelineController(object): set_force_local_repo = True self._task = Task.init( - project_name=project_name, - task_name=task_name, + project_name=pipeline_project_args["project_name"], + task_name=pipeline_project_args["task_name"], task_type=Task.TaskTypes.controller, auto_resource_monitoring=False, reuse_last_task_id=False @@ -382,15 +383,13 @@ class PipelineController(object): self._task._wait_for_repo_detection(timeout=300.) Task.force_store_standalone_script(force=False) - # make sure project is hidden - if self._pipeline_as_sub_project: - get_or_create_project( - self._task.session, project_name=parent_project, system_tags=["hidden"]) - get_or_create_project( - self._task.session, project_name=project_name, - project_id=self._task.project, system_tags=self._project_system_tags) - + self._create_pipeline_projects( + task=self._task, + parent_project=pipeline_project_args["parent_project"], + project_name=pipeline_project_args["project_name"], + ) self._task.set_system_tags((self._task.get_system_tags() or []) + [self._tag]) + if output_uri is not None: self._task.output_uri = output_uri self._output_uri = output_uri @@ -401,7 +400,7 @@ class PipelineController(object): self._task.set_script(repository=repo, branch=repo_branch, commit=repo_commit, working_dir=working_dir) self._auto_connect_task = bool(self._task) # make sure we add to the main Task the pipeline tag - if self._task and not self._pipeline_as_sub_project: + if self._task and not self._pipeline_as_sub_project(): self._task.add_tags([self._tag]) self._monitored_nodes = {} # type: Dict[str, dict] @@ -411,9 +410,9 @@ class PipelineController(object): else self._default_retry_on_failure_callback # add direct link to the pipeline page - if self._pipeline_as_sub_project and self._task: + if self._pipeline_as_sub_project() and self._task: if add_run_number and self._task.running_locally(): - self._add_pipeline_name_run_number() + self._add_pipeline_name_run_number(self._task) # noinspection PyProtectedMember self._task.get_logger().report_text('ClearML pipeline page: {}'.format( '{}/pipelines/{}/experiments/{}'.format( @@ -423,6 +422,12 @@ class PipelineController(object): )) ) + @classmethod + def _pipeline_as_sub_project(cls): + if cls._pipeline_as_sub_project_cached is None: + cls._pipeline_as_sub_project_cached = bool(Session.check_min_api_server_version("2.17")) + return cls._pipeline_as_sub_project_cached + def set_default_execution_queue(self, default_execution_queue): # type: (Optional[str]) -> None """ @@ -1224,7 +1229,7 @@ class PipelineController(object): :param bool wait_on_upload: Whether the upload should be synchronous, forcing the upload to complete before continuing. - :param Callable[Any, Union[bytes, bytearray]] serialization_function: A serialization function that takes one + :param serialization_function: A serialization function that takes one parameter of any type which is the object to be serialized. The function should return a `bytes` or `bytearray` object, which represents the serialized object. Note that the object will be immediately serialized using this function, thus other serialization methods will not be used @@ -1440,6 +1445,170 @@ class PipelineController(object): """ return self._pipeline_args + @classmethod + def _create_pipeline_project_args(cls, name, project): + task_name = name or project or '{}'.format(datetime.now()) + if cls._pipeline_as_sub_project(): + parent_project = (project + "/" if project else "") + cls._project_section + project_name = "{}/{}".format(parent_project, task_name) + else: + parent_project = None + project_name = project or 'Pipelines' + return {"task_name": task_name, "parent_project": parent_project, "project_name": project_name} + + @classmethod + def _create_pipeline_projects(cls, task, parent_project, project_name): + # make sure project is hidden + if not cls._pipeline_as_sub_project(): + return + get_or_create_project(Task._get_default_session(), project_name=parent_project, system_tags=["hidden"]) + return get_or_create_project( + Task._get_default_session(), + project_name=project_name, + project_id=task.project, + system_tags=cls._project_system_tags, + ) + + @classmethod + def create( + cls, + project_name, # type: str + task_name, # type: str + repo=None, # type: str + branch=None, # type: Optional[str] + commit=None, # type: Optional[str] + script=None, # type: Optional[str] + working_directory=None, # type: Optional[str] + packages=None, # type: Optional[Union[bool, Sequence[str]]] + requirements_file=None, # type: Optional[Union[str, Path]] + docker=None, # type: Optional[str] + docker_args=None, # type: Optional[str] + docker_bash_setup_script=None, # type: Optional[str] + argparse_args=None, # type: Optional[Sequence[Tuple[str, str]]] + force_single_script_file=False, # type: bool + version=None, # type: Optional[str] + add_run_number=True, # type: bool + ): + # type: (...) -> PipelineController + """ + Manually create and populate a new Pipeline in the system. + Supports pipelines from functions, decorators and tasks. + + :param project_name: Set the project name for the pipeline. + :param task_name: Set the name of the remote pipeline.. + :param repo: Remote URL for the repository to use, or path to local copy of the git repository. + Example: 'https://github.com/allegroai/clearml.git' or '~/project/repo'. If ``repo`` is specified, then + the ``script`` parameter must also be specified + :param branch: Select specific repository branch/tag (implies the latest commit from the branch) + :param commit: Select specific commit ID to use (default: latest commit, + or when used with local repository matching the local commit ID) + :param script: Specify the entry point script for the remote execution. When used in tandem with + remote git repository the script should be a relative path inside the repository, + for example: './source/train.py' . When used with local repository path it supports a + direct path to a file inside the local repository itself, for example: '~/project/source/train.py' + :param working_directory: Working directory to launch the script from. Default: repository root folder. + Relative to repo root or local folder. + :param packages: Manually specify a list of required packages. Example: ``["tqdm>=2.1", "scikit-learn"]`` + or `True` to automatically create requirements + based on locally installed packages (repository must be local). + :param requirements_file: Specify requirements.txt file to install when setting the session. + If not provided, the requirements.txt from the repository will be used. + :param docker: Select the docker image to be executed in by the remote session + :param docker_args: Add docker arguments, pass a single string + :param docker_bash_setup_script: Add bash script to be executed + inside the docker before setting up the Task's environment + :param argparse_args: Arguments to pass to the remote execution, list of string pairs (argument, value) + Notice, only supported if the codebase itself uses argparse.ArgumentParser + :param force_single_script_file: If True, do not auto-detect local repository + + :return: The newly created PipelineController + """ + pipeline_project_args = cls._create_pipeline_project_args( + name=task_name, project=project_name + ) + pipeline_controller = Task.create( + project_name=pipeline_project_args["project_name"], + task_name=pipeline_project_args["task_name"], + task_type=Task.TaskTypes.controller, + repo=repo, + branch=branch, + commit=commit, + script=script, + working_directory=working_directory, + packages=packages, + requirements_file=requirements_file, + docker=docker, + docker_args=docker_args, + docker_bash_setup_script=docker_bash_setup_script, + argparse_args=argparse_args, + add_task_init_call=False, + force_single_script_file=force_single_script_file + ) + cls._create_pipeline_projects( + task=pipeline_controller, + parent_project=pipeline_project_args["parent_project"], + project_name=pipeline_project_args["project_name"], + ) + pipeline_controller.set_system_tags((pipeline_controller.get_system_tags() or []) + [cls._tag]) + pipeline_controller.set_user_properties(version=version or cls._default_pipeline_version) + if add_run_number: + cls._add_pipeline_name_run_number(pipeline_controller) + print(pipeline_controller.get_output_log_web_page()) + return cls._create_pipeline_controller_from_task(pipeline_controller) + + @classmethod + def clone( + cls, + pipeline_controller, # type: Union[PipelineController, str] + name=None, # type: Optional[str] + comment=None, # type: Optional[str] + parent=None, # type: Optional[str] + project=None, # type: Optional[str] + version=None # type: Optional[str] + ): + # type: (...) -> PipelineController + """ + Create a duplicate (a clone) of a pipeline (experiment). The status of the cloned pipeline is ``Draft`` + and modifiable. + + :param str pipeline_controller: The pipeline to clone. Specify a PipelineController object or an ID. + :param str name: The name of the new cloned pipeline. + :param str comment: A comment / description for the new cloned pipeline. + :param str parent: The ID of the parent Task of the new pipeline. + + - If ``parent`` is not specified, then ``parent`` is set to ``source_task.parent``. + - If ``parent`` is not specified and ``source_task.parent`` is not available, + then ``parent`` set to ``source_task``. + + :param str project: The project name in which to create the new pipeline. + If ``None``, the clone inherits the original pipeline's project + :param str version: The version of the new cloned pipeline. If ``None``, the clone + inherits the original pipeline's version + + :return: The new cloned PipelineController + """ + if isinstance(pipeline_controller, six.string_types): + pipeline_controller = Task.get_task(task_id=pipeline_controller) + elif isinstance(pipeline_controller, PipelineController): + pipeline_controller = pipeline_controller.task + + if project or name: + pipeline_project_args = cls._create_pipeline_project_args( + name=name or pipeline_controller.name, project=project or pipeline_controller.get_project_name() + ) + project = cls._create_pipeline_projects( + task=pipeline_controller, + parent_project=pipeline_project_args["parent_project"], + project_name=pipeline_project_args["project_name"], + ) + name = pipeline_project_args["task_name"] + cloned_controller = Task.clone( + source_task=pipeline_controller, name=name, comment=comment, parent=parent, project=project + ) + if version: + cloned_controller.set_user_properties(version=version) + return cls._create_pipeline_controller_from_task(cloned_controller) + @classmethod def enqueue(cls, pipeline_controller, queue_name=None, queue_id=None, force=False): # type: (Union[PipelineController, str], Optional[str], Optional[str], bool) -> Any @@ -1555,6 +1724,10 @@ class PipelineController(object): error_msg += ", pipeline_version={}".format(pipeline_version) raise ValueError(error_msg) pipeline_task = Task.get_task(task_id=pipeline_id) + return cls._create_pipeline_controller_from_task(pipeline_task) + + @classmethod + def _create_pipeline_controller_from_task(cls, pipeline_task): pipeline_object = cls.__new__(cls) pipeline_object._task = pipeline_task pipeline_object._nodes = {} @@ -1566,6 +1739,11 @@ class PipelineController(object): pass return pipeline_object + @property + def task(self): + # type: () -> Task + return self._task + @property def id(self): # type: () -> str @@ -2473,7 +2651,7 @@ class PipelineController(object): extra_args = dict() extra_args["project"] = self._get_target_project(return_project_id=True) or None # set Task name to match job name - if self._pipeline_as_sub_project: + if self._pipeline_as_sub_project(): extra_args["name"] = node.name if node.explicit_docker_image: extra_args["explicit_docker_image"] = node.explicit_docker_image @@ -2507,6 +2685,7 @@ class PipelineController(object): task_overrides=task_overrides, allow_caching=node.cache_executed_step, output_uri=node.output_uri, + enable_local_imports=self._enable_local_imports, **extra_args ) except Exception: @@ -3194,23 +3373,24 @@ class PipelineController(object): session=self._task.session if self._task else Task.default_session, project_name=self._target_project) - def _add_pipeline_name_run_number(self): + @classmethod + def _add_pipeline_name_run_number(cls, task): # type: () -> None - if not self._task: + if not task: return # if we were already executed, do not rename (meaning aborted pipeline that was continued) # noinspection PyProtectedMember - if self._task._get_runtime_properties().get(self._runtime_property_hash): + if task._get_runtime_properties().get(cls._runtime_property_hash): return # remove the # suffix if we have one: - task_name = re.compile(r" #\d+$").split(self._task.name or "", 1)[0] + task_name = re.compile(r" #\d+$").split(task.name or "", 1)[0] page_size = 100 # find exact name or " #" extension - prev_pipelines_ids = self._task.query_tasks( + prev_pipelines_ids = task.query_tasks( task_name=r"^{}(| #\d+)$".format(task_name), task_filter=dict( - project=[self._task.project], system_tags=[self._tag], + project=[task.project], system_tags=[cls._tag], order_by=['-created'], page_size=page_size, fetch_only_first_page=True, @@ -3223,8 +3403,8 @@ class PipelineController(object): # worst case fail to auto increment try: # we assume we are the latest so let's take a few (last 10) and check the max number - last_task_name = self._task.query_tasks( - task_filter=dict(task_ids=prev_pipelines_ids[:10], project=[self._task.project]), + last_task_name = task.query_tasks( + task_filter=dict(task_ids=prev_pipelines_ids[:10], project=[task.project]), additional_return_fields=['name'], ) # type: List[Dict] # let's parse the names @@ -3243,7 +3423,7 @@ class PipelineController(object): max_value = 0 if max_value > 1: - self._task.set_name(task_name + " #{}".format(max_value)) + task.set_name(task_name + " #{}".format(max_value)) @classmethod def _get_pipeline_task(cls): @@ -3497,7 +3677,8 @@ class PipelineDecorator(PipelineController): artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]] output_uri=None, # type: Optional[Union[str, bool]] skip_global_imports=False, # type: bool - working_dir=None # type: Optional[str] + working_dir=None, # type: Optional[str] + enable_local_imports=True # type: bool ): # type: (...) -> () """ @@ -3585,6 +3766,11 @@ class PipelineDecorator(PipelineController): global imports will be automatically imported in a safe manner at the beginning of each stepโ€™s execution. Default is False :param working_dir: Working directory to launch the pipeline from. + :param enable_local_imports: If True, allow pipeline steps to import from local files + by appending to the PYTHONPATH of each step the directory the pipeline controller + script resides in (sys.path[0]). + If False, the directory won't be appended to PYTHONPATH. Default is True. + Ignored while running remotely. """ super(PipelineDecorator, self).__init__( name=name, @@ -3608,9 +3794,9 @@ class PipelineDecorator(PipelineController): artifact_deserialization_function=artifact_deserialization_function, output_uri=output_uri, skip_global_imports=skip_global_imports, - working_dir=working_dir + working_dir=working_dir, + enable_local_imports=enable_local_imports ) - # if we are in eager execution, make sure parent class knows it if self._eager_execution_instance: self._mock_execution = True @@ -3898,7 +4084,7 @@ class PipelineDecorator(PipelineController): artifact_serialization_function=self._artifact_serialization_function, artifact_deserialization_function=self._artifact_deserialization_function, skip_global_imports=self._skip_global_imports, - working_dir=working_dir, + working_dir=working_dir ) return task_definition @@ -4429,7 +4615,8 @@ class PipelineDecorator(PipelineController): artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]] output_uri=None, # type: Optional[Union[str, bool]] skip_global_imports=False, # type: bool - working_dir=None # type: Optional[str] + working_dir=None, # type: Optional[str] + enable_local_imports=True # type: bool ): # type: (...) -> Callable """ @@ -4548,6 +4735,11 @@ class PipelineDecorator(PipelineController): global imports will be automatically imported in a safe manner at the beginning of each stepโ€™s execution. Default is False :param working_dir: Working directory to launch the pipeline from. + :param enable_local_imports: If True, allow pipeline steps to import from local files + by appending to the PYTHONPATH of each step the directory the pipeline controller + script resides in (sys.path[0]). + If False, the directory won't be appended to PYTHONPATH. Default is True. + Ignored while running remotely. """ def decorator_wrap(func): @@ -4596,7 +4788,8 @@ class PipelineDecorator(PipelineController): artifact_deserialization_function=artifact_deserialization_function, output_uri=output_uri, skip_global_imports=skip_global_imports, - working_dir=working_dir + working_dir=working_dir, + enable_local_imports=enable_local_imports ) ret_val = func(**pipeline_kwargs) LazyEvalWrapper.trigger_all_remote_references() @@ -4650,7 +4843,8 @@ class PipelineDecorator(PipelineController): artifact_deserialization_function=artifact_deserialization_function, output_uri=output_uri, skip_global_imports=skip_global_imports, - working_dir=working_dir + working_dir=working_dir, + enable_local_imports=enable_local_imports ) a_pipeline._args_map = args_map or {} diff --git a/clearml/automation/job.py b/clearml/automation/job.py index 235f23c1..65803198 100644 --- a/clearml/automation/job.py +++ b/clearml/automation/job.py @@ -522,6 +522,7 @@ class ClearmlJob(BaseJob): allow_caching=False, # type: bool target_project=None, # type: Optional[str] output_uri=None, # type: Optional[Union[str, bool]] + enable_local_imports=True, # type: bool **kwargs # type: Any ): # type: (...) -> () @@ -545,12 +546,17 @@ class ClearmlJob(BaseJob): If True, use the base_task_id directly (base-task must be in draft-mode / created), :param bool allow_caching: If True, check if we have a previously executed Task with the same specification. If we do, use it and set internal is_cached flag. Default False (always create new Task). - :param Union[str, bool] output_uri: The storage / output url for this job. This is the default location for + :param output_uri: The storage / output url for this job. This is the default location for output models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter). :param str target_project: Optional, Set the target project name to create the cloned Task in. + :param enable_local_imports: If True, allow jobs to import from local files + by appending PYTHONPATH sys.path[0]. + If False, the current path directory won't be appended to PYTHONPATH. Default is True. + Ignored while running remotely. """ super(ClearmlJob, self).__init__() base_temp_task = Task.get_task(task_id=base_task_id) + self._enable_local_imports = enable_local_imports if disable_clone_task: self.task = base_temp_task task_status = self.task.status @@ -717,6 +723,14 @@ class LocalClearmlJob(ClearmlJob): env['CLEARML_TASK_ID'] = env['TRAINS_TASK_ID'] = str(self.task.id) env['CLEARML_LOG_TASK_TO_BACKEND'] = '1' env['CLEARML_SIMULATE_REMOTE_TASK'] = '1' + try: + if self._enable_local_imports: + current_python_path = env.get("PYTHONPATH") + env["PYTHONPATH"] = ( + "{}:{}".format(current_python_path, sys.path[0]) if current_python_path else sys.path[0] + ) + except Exception as e: + logger.warning("Could not append local path to PYTHONPATH: {}".format(e)) self.task.mark_started() self._job_process = subprocess.Popen(args=[python, local_filename], cwd=cwd, env=env) return True diff --git a/clearml/automation/optimization.py b/clearml/automation/optimization.py index 02c3504b..df7d421c 100644 --- a/clearml/automation/optimization.py +++ b/clearml/automation/optimization.py @@ -1290,7 +1290,7 @@ class HyperParameterOptimizer(object): :param str base_task_id: The Task ID to be used as template experiment to optimize. :param list hyper_parameters: The list of Parameter objects to optimize over. - :param Union[str, Sequence[str]] objective_metric_title: The Objective metric title(s) to maximize / minimize + :param objective_metric_title: The Objective metric title(s) to maximize / minimize (for example, ``validation``, ``["validation", "loss"]``). If ``objective_metric_title`` is a sequence (used to optimize multiple objectives at the same time), then ``objective_metric_series`` and ``objective_metric_sign`` have to be sequences of the same length. Each title will be matched diff --git a/clearml/backend_api/session/apimodel.py b/clearml/backend_api/session/apimodel.py index 2d67be9f..7319b150 100644 --- a/clearml/backend_api/session/apimodel.py +++ b/clearml/backend_api/session/apimodel.py @@ -2,7 +2,8 @@ from .datamodel import DataModel class ApiModel(DataModel): - """ API-related data model """ + """API-related data model""" + _service = None _action = None _version = None diff --git a/clearml/backend_api/session/client/client.py b/clearml/backend_api/session/client/client.py index 3fbaf9a0..98c46c64 100644 --- a/clearml/backend_api/session/client/client.py +++ b/clearml/backend_api/session/client/client.py @@ -98,9 +98,7 @@ class StrictSession(Session): """ def init(): - super(StrictSession, self).__init__( - initialize_logging=initialize_logging, *args, **kwargs - ) + super(StrictSession, self).__init__(initialize_logging=initialize_logging, *args, **kwargs) if not config_file: init() @@ -142,17 +140,17 @@ class Response(object): self.response = None self._result = result response = getattr(result, "response", result) - if getattr(response, "_service") == "events" and \ - getattr(response, "_action") in ("scalar_metrics_iter_histogram", - "multi_task_scalar_metrics_iter_histogram", - "vector_metrics_iter_histogram", - ): + if getattr(response, "_service") == "events" and getattr(response, "_action") in ( + "scalar_metrics_iter_histogram", + "multi_task_scalar_metrics_iter_histogram", + "vector_metrics_iter_histogram", + ): # put all the response data under metrics: response.metrics = result.response_data # noinspection PyProtectedMember - if 'metrics' not in response.__class__._get_data_props(): + if "metrics" not in response.__class__._get_data_props(): # noinspection PyProtectedMember - response.__class__._data_props_list['metrics'] = 'metrics' + response.__class__._data_props_list["metrics"] = "metrics" if dest: response = getattr(response, dest) self.response = response @@ -170,11 +168,7 @@ class Response(object): return repr(self.response) def __dir__(self): - fields = [ - name - for name in dir(self.response) - if isinstance(getattr(type(self.response), name, None), property) - ] + fields = [name for name in dir(self.response) if isinstance(getattr(type(self.response), name, None), property)] return list(set(chain(super(Response, self).__dir__(), fields)) - {"response"}) @@ -224,7 +218,7 @@ class TableResponse(Response): return "" if result is None else result fields = fields or self.fields - return '\n'.join(str(dict((attr, getter(item, attr)) for attr in fields)) for item in self) + return "\n".join(str(dict((attr, getter(item, attr)) for attr in fields)) for item in self) def display(self, fields=None): print(self._format_table(fields=fields)) @@ -251,10 +245,7 @@ class TableResponse(Response): item for item in self if (not predicate or predicate(item)) - and all( - compare_enum(getattr(item, key), value) - for key, value in kwargs.items() - ) + and all(compare_enum(getattr(item, key), value) for key, value in kwargs.items()) ], ) @@ -393,9 +384,7 @@ def make_action(service, request_cls): @wrap def get(self, *args, **kwargs): - return entity( - self, self.session.send(request_cls(*args, **kwargs)).response - ) + return entity(self, self.session.send(request_cls(*args, **kwargs)).response) elif action == "create": @@ -403,9 +392,7 @@ def make_action(service, request_cls): def get(self, *args, **kwargs): return entity( self, - Namespace( - id=self.session.send(request_cls(*args, **kwargs)).response.id - ), + Namespace(id=self.session.send(request_cls(*args, **kwargs)).response.id), ) elif action in ["get_all", "get_all_ex"]: @@ -458,7 +445,8 @@ def get_requests(service): return OrderedDict( (key, value) for key, value in sorted( - vars(service.__wrapped__ if hasattr(service, '__wrapped__') else service).items(), key=itemgetter(0)) + vars(service.__wrapped__ if hasattr(service, "__wrapped__") else service).items(), key=itemgetter(0) + ) if isinstance(value, type) and issubclass(value, APIRequest) and value._action ) @@ -476,10 +464,7 @@ def make_service_class(module): ] ) properties.update( - (f.__name__, f) - for f in ( - make_action(module, value) for key, value in get_requests(module).items() - ) + (f.__name__, f) for f in (make_action(module, value) for key, value in get_requests(module).items()) ) # noinspection PyTypeChecker return type(str(module_name(module)), (Service,), properties) @@ -504,14 +489,12 @@ class Version(Entity): except AttributeError: published = False - self.data = self._service.get_versions( - dataset=self.dataset, only_published=published, versions=[self.id] - )[0].data + self.data = self._service.get_versions(dataset=self.dataset, only_published=published, versions=[self.id])[ + 0 + ].data def _get_default_kwargs(self): - return dict( - super(Version, self)._get_default_kwargs(), **{"dataset": self.data.dataset} - ) + return dict(super(Version, self)._get_default_kwargs(), **{"dataset": self.data.dataset}) class APIClient(object): @@ -549,18 +532,13 @@ class APIClient(object): if mod ) else: - services = OrderedDict( - (name, getattr(_api_services, name)) for name in _api_services.__all__ - ) + services = OrderedDict((name, getattr(_api_services, name)) for name in _api_services.__all__) self._update_services(services) def _update_services(self, services): # type: (Dict[str, types.ModuleType]) -> () self.__dict__.update( dict( - { - name: make_service_class(module)(self.session) - for name, module in services.items() - }, + {name: make_service_class(module)(self.session) for name, module in services.items()}, ) ) diff --git a/clearml/backend_api/session/session.py b/clearml/backend_api/session/session.py index b62673f7..effe7397 100644 --- a/clearml/backend_api/session/session.py +++ b/clearml/backend_api/session/session.py @@ -204,6 +204,7 @@ class Session(TokenManager): "api.auth.request_token_expiration_sec", self.config.get("api.auth.req_token_expiration_sec", None) ) + self.__auth_token = None self._update_default_api_method() if ENV_AUTH_TOKEN.get(): @@ -379,6 +380,11 @@ class Session(TokenManager): case (only once). This is done since permissions are embedded in the token, and addresses a case where server-side permissions have changed but are not reflected in the current token. Refreshing the token will generate a token with the updated permissions. + + NOTE: This method does not handle authorization. Credentials or token should be provided using the auth or + headers arguments, otherwise a successful authorization depends on the session containing a valid cookie + set during the last login call (which may not be there if the server's cookie domain does not match the URL + we use to access the server) """ if self._offline_mode: return None @@ -842,13 +848,27 @@ class Session(TokenManager): ) ) + auth = None headers = None - # use token only once (the second time the token is already built into the http session) - if self.__auth_token: - headers = dict(Authorization="Bearer {}".format(self.__auth_token)) - self.__auth_token = None - auth = HTTPBasicAuth(self.access_key, self.secret_key) if self.access_key and self.secret_key else None + token = None + if self.__auth_token: # try using initially provided token + token = self.__auth_token + elif self.access_key and self.secret_key: # otherwise, try using basic auth (if key/secret exists) + auth = HTTPBasicAuth(self.access_key, self.secret_key) + else: # otherwise, use the latest raw token + token = self.raw_token + + if token: + headers = dict(Authorization="Bearer {}".format(token)) + + if not auth and not headers: + # No authorization info, something went wrong + self._logger.warning( + "refreshing token with no authorization info (no token or credentials, this might fail " + "if session does not have a valid cookie)" + ) + res = None try: res = self._send_request( @@ -878,6 +898,10 @@ class Session(TokenManager): if ENV_AUTH_TOKEN.get(): ENV_AUTH_TOKEN.set(resp["data"]["token"]) + # in any case, the initial token should only be used once (but only do it in case we actually managed + # to generate a new token in case this failed due to transient reasons) + self.__auth_token = None + return resp["data"]["token"] except LoginError: six.reraise(*sys.exc_info()) diff --git a/clearml/backend_config/utils.py b/clearml/backend_config/utils.py index 098a2216..4e02c1f3 100644 --- a/clearml/backend_config/utils.py +++ b/clearml/backend_config/utils.py @@ -11,12 +11,12 @@ if TYPE_CHECKING: def get_items(cls): - """ get key/value items from an enum-like class (members represent enumeration key/value) """ - return {k: v for k, v in vars(cls).items() if not k.startswith('_')} + """get key/value items from an enum-like class (members represent enumeration key/value)""" + return {k: v for k, v in vars(cls).items() if not k.startswith("_")} def get_options(cls): - """ get options from an enum-like class (members represent enumeration key/value) """ + """get options from an enum-like class (members represent enumeration key/value)""" return get_items(cls).values() diff --git a/clearml/backend_interface/task/populate.py b/clearml/backend_interface/task/populate.py index 42e9e9ef..13c574f3 100644 --- a/clearml/backend_interface/task/populate.py +++ b/clearml/backend_interface/task/populate.py @@ -231,7 +231,7 @@ class CreateAndPopulate(object): repo_info.script["diff"] = a_repo_info.script["diff"] or "" repo_info.script["entry_point"] = a_repo_info.script["entry_point"] if a_create_requirements: - repo_info["requirements"] = a_repo_info.script.get("requirements") or {} + repo_info.script["requirements"] = a_repo_info.script.get("requirements") or {} # check if we have no repository and no requirements raise error if ( diff --git a/clearml/backend_interface/task/task.py b/clearml/backend_interface/task/task.py index 331c7343..b83a4d50 100644 --- a/clearml/backend_interface/task/task.py +++ b/clearml/backend_interface/task/task.py @@ -695,6 +695,28 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): ignore_errors=ignore_errors ) + def stop_request(self, ignore_errors=True, force=False, status_message=None): + # type: (bool, bool, Optional[str]) -> () + """ + Request a task to stop. this will not change the task status + but mark a request for an agent or SDK to actually stop the Task. + This will trigger the Task's abort callback, and at the end will + change the task status to stopped and kill the Task's processes + + Notice: calling this on your own Task, will cause + the watchdog to call the on_abort callback and kill the process + + :param bool force: If not True, call fails if the task status is not 'in_progress' + :param bool ignore_errors: if False raise exception on error + :param str status_message: Optional, add status change message to the stop request. + This message will be stored as status_message on the Task's info panel + """ + # request task stop + return self.send( + tasks.StopRequest(self.id, force=force, status_reason="abort request", status_message=status_message), + ignore_errors=ignore_errors + ) + def completed(self, ignore_errors=True): # type: (bool) -> () """ diff --git a/clearml/binding/artifacts.py b/clearml/binding/artifacts.py index 093c80d7..2d04a80d 100644 --- a/clearml/binding/artifacts.py +++ b/clearml/binding/artifacts.py @@ -257,6 +257,7 @@ class Artifacts(object): max_preview_size_bytes = 65536 _flush_frequency_sec = 300. + _max_tmp_file_replace_attemps = 3 # notice these two should match _save_format = '.csv.gz' _compression = 'gzip' @@ -1138,7 +1139,21 @@ class Artifacts(object): temp_folder, prefix, suffix = self._temp_files_lookup.pop(local_filename) fd, temp_filename = mkstemp(prefix=prefix, suffix=suffix) os.close(fd) - os.replace(local_filename, temp_filename) + + for i in range(self._max_tmp_file_replace_attemps): + try: + os.replace(local_filename, temp_filename) + break + except PermissionError: + LoggerRoot.get_base_logger().warning( + "Failed to replace {} with {}. Attemps left: {}".format( + local_filename, temp_filename, self._max_tmp_file_replace_attemps - i + ) + ) + else: + # final attempt, and if it fails, throw an exception + # exception could be thrown on some Windows systems + os.replace(local_filename, temp_filename) local_filename = temp_filename os.rmdir(temp_folder) except Exception as ex: diff --git a/clearml/datasets/__init__.py b/clearml/datasets/__init__.py index 84f8c2a8..c1cbe369 100644 --- a/clearml/datasets/__init__.py +++ b/clearml/datasets/__init__.py @@ -1,6 +1,6 @@ from .dataset import FileEntry, Dataset __all__ = [ - "FileEntry", - "Dataset", + "FileEntry", + "Dataset", ] diff --git a/clearml/datasets/dataset.py b/clearml/datasets/dataset.py index 4ac0e001..35f49c16 100644 --- a/clearml/datasets/dataset.py +++ b/clearml/datasets/dataset.py @@ -552,7 +552,7 @@ class Dataset(object): k: v for k, v in self._dataset_link_entries.items() if not matches_any_wildcard(k, dataset_path, recursive=recursive) - and not matches_any_wildcard(v.link, dataset_path, recursive=recursive) + and not (matches_any_wildcard(v.link, dataset_path, recursive=recursive) or v.link == dataset_path) } removed = 0 @@ -2263,14 +2263,14 @@ class Dataset(object): def _get_dataset_files( self, - force=False, - selected_chunks=None, - lock_target_folder=False, - cleanup_target_folder=True, - target_folder=None, - max_workers=None + force=False, # type: bool + selected_chunks=None, # type: Optional[List[int]] + lock_target_folder=False, # type: bool + cleanup_target_folder=True, # type: bool + target_folder=None, # type: Optional[Path] + max_workers=None, # type: Optional[int] + link_entries_of_interest=None # type: Optional[Dict[str, LinkEntry]] ): - # type: (bool, Optional[List[int]], bool, bool, Optional[Path], Optional[int]) -> str """ First, extracts the archive present on the ClearML server containing this dataset's files. Then, download the remote files. Note that if a remote file was added to the ClearML server, then @@ -2287,6 +2287,8 @@ class Dataset(object): :param target_folder: If provided use the specified target folder, default, auto generate from Dataset ID. :param max_workers: Number of threads to be spawned when getting dataset files. Defaults to the number of virtual cores. + :param link_entries_of_interest: Download only the external files in this dictionary. + Useful when one doesn't want to download all the files in a parent dataset, as some files might be removed :return: Path to the local storage where the data was downloaded """ @@ -2300,14 +2302,21 @@ class Dataset(object): max_workers=max_workers ) self._download_external_files( - target_folder=target_folder, lock_target_folder=lock_target_folder, max_workers=max_workers + target_folder=target_folder, + lock_target_folder=lock_target_folder, + max_workers=max_workers, + link_entries_of_interest=link_entries_of_interest, ) return local_folder def _download_external_files( - self, target_folder=None, lock_target_folder=False, max_workers=None + self, + target_folder=None, + lock_target_folder=False, + max_workers=None, + link_entries_of_interest=None ): - # (Union(Path, str), bool) -> None + # (Union(Path, str), bool, Optional[int], Optional[Dict[str, LinkEntry]]) -> None """ Downloads external files in the dataset. These files will be downloaded at relative_path (the path relative to the target_folder). Note that @@ -2318,6 +2327,8 @@ class Dataset(object): :param lock_target_folder: If True, local the target folder so the next cleanup will not delete Notice you should unlock it manually, or wait for the process to finish for auto unlocking. :param max_workers: Number of threads to be spawned when getting dataset files. Defaults to no multi-threading. + :param link_entries_of_interest: Download only the external files in this dictionary. + Useful when one doesn't want to download all the files in a parent dataset, as some files might be removed """ def _download_link(link, target_path): if os.path.exists(target_path): @@ -2370,12 +2381,13 @@ class Dataset(object): )[0] ).as_posix() + link_entries_of_interest = link_entries_of_interest or self._dataset_link_entries if not max_workers: - for relative_path, link in self._dataset_link_entries.items(): + for relative_path, link in link_entries_of_interest.items(): _submit_download_link(relative_path, link, target_folder) else: with ThreadPoolExecutor(max_workers=max_workers) as pool: - for relative_path, link in self._dataset_link_entries.items(): + for relative_path, link in link_entries_of_interest.items(): _submit_download_link(relative_path, link, target_folder, pool=pool) def _extract_dataset_archive( @@ -3224,7 +3236,8 @@ class Dataset(object): force=force, lock_target_folder=True, cleanup_target_folder=False, - max_workers=max_workers + max_workers=max_workers, + link_entries_of_interest=self._dataset_link_entries )) ds_base_folder.touch() diff --git a/clearml/debugging/__init__.py b/clearml/debugging/__init__.py index b48c7f9f..d3631b11 100644 --- a/clearml/debugging/__init__.py +++ b/clearml/debugging/__init__.py @@ -1,7 +1,22 @@ """ Debugging module """ from .timer import Timer -from .log import get_logger, get_null_logger, TqdmLog, add_options as add_log_options, \ - apply_logging_args as parse_log_args, add_rotating_file_handler, add_time_rotating_file_handler +from .log import ( + get_logger, + get_null_logger, + TqdmLog, + add_options as add_log_options, + apply_logging_args as parse_log_args, + add_rotating_file_handler, + add_time_rotating_file_handler, +) -__all__ = ["Timer", "get_logger", "get_null_logger", "TqdmLog", "add_log_options", "parse_log_args", - "add_rotating_file_handler", "add_time_rotating_file_handler"] +__all__ = [ + "Timer", + "get_logger", + "get_null_logger", + "TqdmLog", + "add_log_options", + "parse_log_args", + "add_rotating_file_handler", + "add_time_rotating_file_handler", +] diff --git a/clearml/debugging/trace.py b/clearml/debugging/trace.py index fb4fa89b..a5570ae8 100644 --- a/clearml/debugging/trace.py +++ b/clearml/debugging/trace.py @@ -21,7 +21,7 @@ def _thread_linux_id(): def _thread_py_id(): # return threading.get_ident() - return zipfile.crc32(int(threading.get_ident()).to_bytes(8, 'little')) + return zipfile.crc32(int(threading.get_ident()).to_bytes(8, "little")) def _log_stderr(name, fnc, args, kwargs, is_return): @@ -32,20 +32,22 @@ def _log_stderr(name, fnc, args, kwargs, is_return): return if __trace_level not in (1, 2, -1, -2): return - fnc_address = str(fnc).split(' at ') - fnc_address = '{}'.format(fnc_address[-1].replace('>', '')) if len(fnc_address) > 1 else '' + fnc_address = str(fnc).split(" at ") + fnc_address = "{}".format(fnc_address[-1].replace(">", "")) if len(fnc_address) > 1 else "" if __trace_level == 1 or __trace_level == -1: - t = '{:14} {}'.format(fnc_address, name) + t = "{:14} {}".format(fnc_address, name) elif __trace_level == 2 or __trace_level == -2: - a_args = str(args)[1:-1] if args else '' - a_kwargs = ' {}'.format(kwargs) if kwargs else '' - t = '{:14} {} ({}{})'.format(fnc_address, name, a_args, a_kwargs) + a_args = str(args)[1:-1] if args else "" + a_kwargs = " {}".format(kwargs) if kwargs else "" + t = "{:14} {} ({}{})".format(fnc_address, name, a_args, a_kwargs) # get a nicer thread id h = int(__thread_id()) ts = time.time() - __trace_start - __stream_write('{}{:<9.3f}:{:5}:{:8x}: [{}] {}\n'.format( - '-' if is_return else '', ts, os.getpid(), - h, threading.current_thread().name, t)) + __stream_write( + "{}{:<9.3f}:{:5}:{:8x}: [{}] {}\n".format( + "-" if is_return else "", ts, os.getpid(), h, threading.current_thread().name, t + ) + ) if __stream_flush: __stream_flush() except Exception: @@ -64,6 +66,7 @@ def _traced_call_method(name, fnc): if r: raise r return ret + return _traced_call_int @@ -82,7 +85,7 @@ def _traced_call_cls(name, fnc): raise r return ret - return WrapperClass.__dict__['_traced_call_int'] + return WrapperClass.__dict__["_traced_call_int"] def _traced_call_static(name, fnc): @@ -99,7 +102,8 @@ def _traced_call_static(name, fnc): if r: raise r return ret - return WrapperStatic.__dict__['_traced_call_int'] + + return WrapperStatic.__dict__["_traced_call_int"] def _traced_call_func(name, fnc): @@ -114,14 +118,16 @@ def _traced_call_func(name, fnc): if r: raise r return ret + return _traced_call_int -def _patch_module(module, prefix='', basepath=None, basemodule=None, exclude_prefixes=[], only_prefix=[]): +def _patch_module(module, prefix="", basepath=None, basemodule=None, exclude_prefixes=[], only_prefix=[]): if isinstance(module, str): if basemodule is None: - basemodule = module + '.' + basemodule = module + "." import importlib + importlib.import_module(module) module = sys.modules.get(module) if not module: @@ -130,8 +136,8 @@ def _patch_module(module, prefix='', basepath=None, basemodule=None, exclude_pre basepath = os.path.sep.join(module.__file__.split(os.path.sep)[:-1]) + os.path.sep # only sub modules - if not hasattr(module, '__file__') or (inspect.ismodule(module) and not module.__file__.startswith(basepath)): - if hasattr(module, '__module__') and module.__module__.startswith(basemodule): + if not hasattr(module, "__file__") or (inspect.ismodule(module) and not module.__file__.startswith(basepath)): + if hasattr(module, "__module__") and module.__module__.startswith(basemodule): # this is one of ours pass else: @@ -139,25 +145,25 @@ def _patch_module(module, prefix='', basepath=None, basemodule=None, exclude_pre return # Do not patch ourselves - if hasattr(module, '__file__') and module.__file__ == __file__: + if hasattr(module, "__file__") and module.__file__ == __file__: return - prefix += module.__name__.split('.')[-1] + '.' + prefix += module.__name__.split(".")[-1] + "." # Do not patch low level network layer - if prefix.startswith('clearml.backend_api.session.') and prefix != 'clearml.backend_api.session.': - if not prefix.endswith('.Session.') and '.token_manager.' not in prefix: + if prefix.startswith("clearml.backend_api.session.") and prefix != "clearml.backend_api.session.": + if not prefix.endswith(".Session.") and ".token_manager." not in prefix: # print('SKIPPING: {}'.format(prefix)) return - if prefix.startswith('clearml.backend_api.services.'): + if prefix.startswith("clearml.backend_api.services."): return for skip in exclude_prefixes: if prefix.startswith(skip): return - for fn in (m for m in dir(module) if not m.startswith('__')): - if fn in ('schema_property') or fn.startswith('_PostImportHookPatching__'): + for fn in (m for m in dir(module) if not m.startswith("__")): + if fn in ("schema_property") or fn.startswith("_PostImportHookPatching__"): continue # noinspection PyBroadException try: @@ -165,27 +171,39 @@ def _patch_module(module, prefix='', basepath=None, basemodule=None, exclude_pre except Exception: continue if inspect.ismodule(fnc): - _patch_module(fnc, prefix=prefix, basepath=basepath, basemodule=basemodule, - exclude_prefixes=exclude_prefixes, only_prefix=only_prefix) + _patch_module( + fnc, + prefix=prefix, + basepath=basepath, + basemodule=basemodule, + exclude_prefixes=exclude_prefixes, + only_prefix=only_prefix, + ) elif inspect.isclass(fnc): - _patch_module(fnc, prefix=prefix, basepath=basepath, basemodule=basemodule, - exclude_prefixes=exclude_prefixes, only_prefix=only_prefix) + _patch_module( + fnc, + prefix=prefix, + basepath=basepath, + basemodule=basemodule, + exclude_prefixes=exclude_prefixes, + only_prefix=only_prefix, + ) elif inspect.isroutine(fnc): - if only_prefix and all(p not in (prefix+str(fn)) for p in only_prefix): + if only_prefix and all(p not in (prefix + str(fn)) for p in only_prefix): continue for skip in exclude_prefixes: - if (prefix+str(fn)).startswith(skip): + if (prefix + str(fn)).startswith(skip): continue # _log_stderr('Patching: {}'.format(prefix+fn)) if inspect.isclass(module): # check if this is even in our module - if hasattr(fnc, '__module__') and fnc.__module__ != module.__module__: + if hasattr(fnc, "__module__") and fnc.__module__ != module.__module__: pass # print('not ours {} {}'.format(module, fnc)) - elif hasattr(fnc, '__qualname__') and fnc.__qualname__.startswith(module.__name__ + '.'): + elif hasattr(fnc, "__qualname__") and fnc.__qualname__.startswith(module.__name__ + "."): if isinstance(module.__dict__[fn], classmethod): setattr(module, fn, _traced_call_cls(prefix + fn, fnc)) elif isinstance(module.__dict__[fn], staticmethod): @@ -194,7 +212,7 @@ def _patch_module(module, prefix='', basepath=None, basemodule=None, exclude_pre setattr(module, fn, _traced_call_method(prefix + fn, fnc)) else: # probably not ours hopefully static function - if hasattr(fnc, '__qualname__') and not fnc.__qualname__.startswith(module.__name__ + '.'): + if hasattr(fnc, "__qualname__") and not fnc.__qualname__.startswith(module.__name__ + "."): pass # print('not ours {} {}'.format(module, fnc)) else: # we should not get here @@ -226,17 +244,18 @@ def trace_trains(stream=None, level=1, exclude_prefixes=[], only_prefix=[]): return __patched_trace = True if not __thread_id: - if sys.platform == 'linux': + if sys.platform == "linux": import ctypes - __thread_so = ctypes.cdll.LoadLibrary('libc.so.6') + + __thread_so = ctypes.cdll.LoadLibrary("libc.so.6") __thread_id = _thread_linux_id else: __thread_id = _thread_py_id - stderr_write = sys.stderr._original_write if hasattr(sys.stderr, '_original_write') else sys.stderr.write + stderr_write = sys.stderr._original_write if hasattr(sys.stderr, "_original_write") else sys.stderr.write if stream: if isinstance(stream, str): - stream = open(stream, 'w') + stream = open(stream, "w") __stream_write = stream.write __stream_flush = stream.flush else: @@ -244,15 +263,16 @@ def trace_trains(stream=None, level=1, exclude_prefixes=[], only_prefix=[]): __stream_flush = None from ..version import __version__ - msg = 'ClearML v{} - Starting Trace\n\n'.format(__version__) + + msg = "ClearML v{} - Starting Trace\n\n".format(__version__) # print to actual stderr stderr_write(msg) # store to stream __stream_write(msg) - __stream_write('{:9}:{:5}:{:8}: {:14}\n'.format('seconds', 'pid', 'tid', 'self')) - __stream_write('{:9}:{:5}:{:8}:{:15}\n'.format('-' * 9, '-' * 5, '-' * 8, '-' * 15)) + __stream_write("{:9}:{:5}:{:8}: {:14}\n".format("seconds", "pid", "tid", "self")) + __stream_write("{:9}:{:5}:{:8}:{:15}\n".format("-" * 9, "-" * 5, "-" * 8, "-" * 15)) __trace_start = time.time() - _patch_module('clearml', exclude_prefixes=exclude_prefixes or [], only_prefix=only_prefix or []) + _patch_module("clearml", exclude_prefixes=exclude_prefixes or [], only_prefix=only_prefix or []) def trace_level(level=1): @@ -286,25 +306,25 @@ def print_traced_files(glob_mask, lines_per_tid=5, stream=sys.stdout, specify_pi from glob import glob def hash_line(a_line): - return hash(':'.join(a_line.split(':')[1:])) + return hash(":".join(a_line.split(":")[1:])) pids = {} orphan_calls = set() print_orphans = False for fname in glob(glob_mask, recursive=False): - with open(fname, 'rt') as fd: + with open(fname, "rt") as fd: lines = fd.readlines() for line in lines: # noinspection PyBroadException try: - _, pid, tid = line.split(':')[:3] + _, pid, tid = line.split(":")[:3] pid = int(pid) except Exception: continue if specify_pids and pid not in specify_pids: continue - if line.startswith('-'): + if line.startswith("-"): print_orphans = True line = line[1:] h = hash_line(line) @@ -323,16 +343,16 @@ def print_traced_files(glob_mask, lines_per_tid=5, stream=sys.stdout, specify_pi by_time = {} for p, tids in pids.items(): for t, lines in tids.items(): - ts = float(lines[-1].split(':')[0].strip()) + 0.000001 * len(by_time) + ts = float(lines[-1].split(":")[0].strip()) + 0.000001 * len(by_time) if print_orphans: for i, line in enumerate(lines): if i > 0 and hash_line(line) in orphan_calls: - lines[i] = ' ### Orphan ### {}'.format(line) - by_time[ts] = ''.join(lines) + '\n' + lines[i] = " ### Orphan ### {}".format(line) + by_time[ts] = "".join(lines) + "\n" - out_stream = open(stream, 'w') if isinstance(stream, str) else stream + out_stream = open(stream, "w") if isinstance(stream, str) else stream for k in sorted(by_time.keys()): - out_stream.write(by_time[k] + '\n') + out_stream.write(by_time[k] + "\n") if isinstance(stream, str): out_stream.close() @@ -345,11 +365,11 @@ def end_of_program(): def stdout_print(*args, **kwargs): if len(args) == 1 and not kwargs: line = str(args[0]) - if not line.endswith('\n'): - line += '\n' + if not line.endswith("\n"): + line += "\n" else: - line = '{} {}\n'.format(args or '', kwargs or '') - if hasattr(sys.stdout, '_original_write'): + line = "{} {}\n".format(args or "", kwargs or "") + if hasattr(sys.stdout, "_original_write"): sys.stdout._original_write(line) else: sys.stdout.write(line) @@ -361,16 +381,18 @@ def debug_print(*args, **kwargs): Example: [pid=123, t=0.003] message here """ global tic - tic = globals().get('tic', time.time()) + tic = globals().get("tic", time.time()) stdout_print( - "\033[1;33m[pid={}, t={:.04f}] ".format(os.getpid(), time.time()-tic) - + str(args[0] if len(args) == 1 else ("" if not args else args)) + "\033[0m", **kwargs - ) + "\033[1;33m[pid={}, t={:.04f}] ".format(os.getpid(), time.time() - tic) + + str(args[0] if len(args) == 1 else ("" if not args else args)) + + "\033[0m", + **kwargs + ) tic = time.time() -if __name__ == '__main__': +if __name__ == "__main__": # from clearml import Task # task = Task.init(project_name="examples", task_name="trace test") # trace_trains('_trace.txt', level=2) - print_traced_files('_trace_*.txt', lines_per_tid=10) + print_traced_files("_trace_*.txt", lines_per_tid=10) diff --git a/clearml/logger.py b/clearml/logger.py index fe0a19df..ee3e35c3 100644 --- a/clearml/logger.py +++ b/clearml/logger.py @@ -230,7 +230,7 @@ class Logger(object): :param xaxis: The x-axis title. (Optional) :param yaxis: The y-axis title. (Optional) :param mode: Multiple histograms mode, stack / group / relative. Default is 'group'. - :param extra_layout: optional dictionary for layout configuration, passed directly to plotly + :param extra_layout: Optional dictionary for layout configuration, passed directly to plotly. See full details on the supported configuration: https://plotly.com/javascript/reference/layout/ example: ``extra_layout={'showlegend': False, 'plot_bgcolor': 'yellow'}`` """ @@ -266,6 +266,7 @@ class Logger(object): data_args=None, # type: Optional[dict] extra_layout=None, # type: Optional[dict] ): + # type: (...) -> () """ For explicit reporting, plot a (default grouped) histogram. Notice this function will not calculate the histogram, @@ -590,12 +591,13 @@ class Logger(object): comment=None, # type: Optional[str] extra_layout=None, # type: Optional[dict] ): + # type: (...) -> () """ For explicit reporting, plot a 3d scatter graph (with markers). :param str title: The title (metric) of the plot. :param str series: The series name (variant) of the reported scatter plot. - :param Union[numpy.ndarray, list] scatter: The scatter data. + :param scatter: The scatter data. list of (pairs of x,y,z), list of series [[(x1,y1,z1)...]], or numpy.ndarray :param int iteration: The reported iteration / step. :param str xaxis: The x-axis title. (Optional) diff --git a/clearml/model.py b/clearml/model.py index 0c76426b..58813cfe 100644 --- a/clearml/model.py +++ b/clearml/model.py @@ -818,12 +818,13 @@ class BaseModel(object): comment=None, # type: Optional[str] extra_layout=None # type: Optional[dict] ): + # type: (...) -> () """ For explicit reporting, plot a 3d scatter graph (with markers). :param str title: The title (metric) of the plot. :param str series: The series name (variant) of the reported scatter plot. - :param Union[numpy.ndarray, list] scatter: The scatter data. + :param scatter: The scatter data. list of (pairs of x,y,z), list of series [[(x1,y1,z1)...]], or numpy.ndarray :param int iteration: The reported iteration / step. :param str xaxis: The x-axis title. (Optional) diff --git a/clearml/router/__init__.py b/clearml/router/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/clearml/router/endpoint_telemetry.py b/clearml/router/endpoint_telemetry.py new file mode 100644 index 00000000..47f45d85 --- /dev/null +++ b/clearml/router/endpoint_telemetry.py @@ -0,0 +1,236 @@ +import copy +import time +import uuid +from threading import Thread + +from ..task import Task +from ..utilities.resource_monitor import ResourceMonitor + + +class EndpointTelemetry: + BACKEND_STAT_MAP = { + "cpu_usage_*": "cpu_usage", + "cpu_temperature_*": "cpu_temperature", + "disk_free_percent": "disk_free_home", + "io_read_mbs": "disk_read", + "io_write_mbs": "disk_write", + "network_tx_mbs": "network_tx", + "network_rx_mbs": "network_rx", + "memory_free_gb": "memory_free", + "memory_used_gb": "memory_used", + "gpu_temperature_*": "gpu_temperature", + "gpu_mem_used_gb_*": "gpu_memory_used", + "gpu_mem_free_gb_*": "gpu_memory_free", + "gpu_utilization_*": "gpu_usage", + } + + def __init__( + self, + endpoint_name="endpoint", + model_name="model", + model=None, + model_url=None, + model_source=None, + model_version=None, + app_id=None, + app_instance=None, + tags=None, + system_tags=None, + container_id=None, + input_size=None, + input_type="str", + report_statistics=True, + endpoint_url=None, + preprocess_artifact=None, + force_register=False, + ): + self.report_window = 30 + self._previous_readouts = {} + self._previous_readouts_ts = time.time() + self._num_readouts = 0 + self.container_info = { + "container_id": container_id or str(uuid.uuid4()).replace("-", ""), + "endpoint_name": endpoint_name, + "model_name": model_name, + "model_source": model_source, + "model_version": model_version, + "preprocess_artifact": preprocess_artifact, + "input_type": str(input_type), + "input_size": str(input_size), + "tags": tags, + "system_tags": system_tags, + "endpoint_url": endpoint_url, + } + references = [] + if app_id: + references.append({"type": "app_id", "value": app_id}) + if app_instance: + references.append({"type": "app_instance", "value": app_instance}) + references.append({"type": "task", "value": Task.current_task().id}) + if model: + references.append({"type": "model", "value": model}) + if model_url: + references.append({"type": "url", "value": model_url}) + self.container_info["reference"] = references + self.session = Task._get_default_session() + self.requests_num = 0 + self.requests_num_window = 0 + self.requests_num_prev_window = 0 + self.latency_sum_window = 0 + self.uptime_timestamp = time.time() + self.last_request_time = None + # use readily available resource monitor, otherwise create one (can happen in spawned subprocesses) + self.resource_monitor = Task.current_task()._resource_monitor or ResourceMonitor(Task.current_task()) + if not container_id and not force_register: + self.register_container() + self._stop_container_status_report_daemon = False + if report_statistics: + Thread(target=self.container_status_report_daemon, daemon=True).start() + + def stop(self): + self._stop_container_status_report_daemon = True + + def update( + self, + endpoint_name=None, + model_name=None, + model=None, + model_url=None, + model_source=None, + model_version=None, + tags=None, + system_tags=None, + input_size=None, + input_type=None, + endpoint_url=None, + preprocess_artifact=None, + ): + update_dict = {} + if endpoint_name is not None: + update_dict["endpoint_name"] = endpoint_name + if model_name is not None: + update_dict["model_name"] = model_name + if model_source is not None: + update_dict["model_source"] = model_source + if model_version is not None: + update_dict["model_version"] = model_version + if preprocess_artifact is not None: + update_dict["preprocess_artifact"] = preprocess_artifact + if input_type is not None: + update_dict["input_type"] = input_type + if input_size is not None: + update_dict["input_size"] = input_size + if tags is not None: + update_dict["tags"] = tags + if system_tags is not None: + update_dict["system_tags"] = system_tags + if endpoint_url is not None: + update_dict["endpoint_url"] = endpoint_url + self.container_info.update(update_dict) + references_to_add = {} + if model: + references_to_add["model"] = {"type": "model", "value": model} + if model_url: + references_to_add["model_url"] = {"type": "url", "value": model_url} + for reference in self.container_info["reference"]: + if reference["type"] in references_to_add: + reference["value"] = references_to_add[reference["type"]]["value"] + references_to_add.pop(reference["type"], None) + self.container_info["reference"].extend(list(references_to_add.values())) + + def register_container(self): + result = self.session.send_request("serving", "register_container", json=self.container_info) + if result.status_code != 200: + print("Failed registering container: {}".format(result.json())) + + def wait_for_endpoint_url(self): + while not self.container_info.get("endpoint_url"): + Task.current_task().reload() + endpoint = Task.current_task()._get_runtime_properties().get("endpoint") + if endpoint: + self.container_info["endpoint_url"] = endpoint + self.uptime_timestamp = time.time() + else: + time.sleep(1) + + def get_machine_stats(self): + def create_general_key(old_key): + return "{}_*".format(old_key) + + stats = self.resource_monitor._machine_stats() + elapsed = time.time() - self._previous_readouts_ts + self._previous_readouts_ts = time.time() + updates = {} + for k, v in stats.items(): + if k.endswith("_mbs"): + v = (v - self._previous_readouts.get(k, v)) / elapsed + updates[k] = v + self._previous_readouts = copy.deepcopy(stats) + stats.update(updates) + self._num_readouts += 1 + + preprocessed_stats = {} + ordered_keys = sorted(stats.keys()) + for k in ordered_keys: + v = stats[k] + if k in ["memory_used_gb", "memory_free_gb"]: + v *= 1024 + if isinstance(v, float): + v = round(v, 3) + stat_key = self.BACKEND_STAT_MAP.get(k) + if stat_key: + preprocessed_stats[stat_key] = v + else: + general_key = create_general_key(k) + if general_key.startswith("gpu"): + prev_general_key = general_key + general_key = "_".join(["gpu"] + general_key.split("_")[2:]) + if general_key == "gpu_mem_used_gb_*": + gpu_index = prev_general_key.split("_")[1] + mem_usage = min(stats["gpu_{}_mem_usage".format(gpu_index)], 99.99) + mem_free = stats["gpu_{}_mem_free_gb".format(gpu_index)] + v = (mem_usage * mem_free) / (100 - mem_usage) + if general_key in ["gpu_mem_used_gb_*", "gpu_mem_free_gb_*"]: + v *= 1024 + general_key = self.BACKEND_STAT_MAP.get(general_key) + if general_key: + preprocessed_stats.setdefault(general_key, []).append(v) + return preprocessed_stats + + def container_status_report_daemon(self): + while not self._stop_container_status_report_daemon: + self.container_status_report() + time.sleep(self.report_window) + + def container_status_report(self): + self.wait_for_endpoint_url() + status_report = {**self.container_info} + status_report["uptime_sec"] = int(time.time() - self.uptime_timestamp) + status_report["requests_num"] = self.requests_num + status_report["requests_min"] = self.requests_num_window + self.requests_num_prev_window + status_report["latency_ms"] = ( + 0 if (self.requests_num_window == 0) else (self.latency_sum_window / self.requests_num_window) + ) + status_report["machine_stats"] = self.get_machine_stats() + self.requests_num_prev_window = self.requests_num_window + self.requests_num_window = 0 + self.latency_sum_window = 0 + self.latency_num_window = 0 + result = self.session.send_request("serving", "container_status_report", json=status_report) + if result.status_code != 200: + print("Failed sending status report: {}".format(result.json())) + + def update_last_request_time(self): + self.last_request_time = time.time() + + def update_statistics(self): + self.requests_num += 1 + self.requests_num_window += 1 + latency = (time.time() - self.last_request_time) * 1000 + self.latency_sum_window += latency + + def on_request(self): + self.update_last_request_time() + + def on_response(self): + self.update_statistics() diff --git a/clearml/router/fastapi_proxy.py b/clearml/router/fastapi_proxy.py new file mode 100644 index 00000000..3b781b0f --- /dev/null +++ b/clearml/router/fastapi_proxy.py @@ -0,0 +1,245 @@ +import functools +import threading +from multiprocessing import Process +from typing import Optional + +import httpx +import uvicorn +from fastapi import FastAPI, Request, Response +from fastapi.responses import StreamingResponse +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.routing import Match + +from .route import Route +from ..utilities.process.mp import SafeQueue + + +class FastAPIProxy: + ALL_REST_METHODS = ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"] + + def __init__(self, port, workers=None, default_target=None, log_level=None, access_log=None, enable_streaming=True): + self.app = None + self.routes = {} + self.port = port + self.message_queue = SafeQueue() + self.uvicorn_subprocess = None + self.workers = workers + self.access_log = access_log + self.log_level = None + self.enable_streaming = enable_streaming + self._default_target = default_target + self._default_session = None + self._in_subprocess = False + + def _create_default_route(self): + proxy = self + + class DefaultRouteMiddleware(BaseHTTPMiddleware): + async def dispatch(self, request: Request, call_next): + scope = { + "type": "http", + "method": request.method, + "path": request.url.path, + "root_path": "", + "headers": request.headers.raw, + "query_string": request.url.query.encode("utf-8"), + "client": request.client, + "server": request.scope.get("server"), + "scheme": request.url.scheme, + "extensions": request.scope.get("extensions", {}), + "app": request.scope.get("app"), + } + for route in proxy.app.router.routes: + if route.matches(scope)[0] == Match.FULL: + return await call_next(request) + proxied_response = await proxy._send_request( + request, proxy._default_target, proxy._default_target + request.url.path + ) + return await proxy._convert_httpx_response_to_fastapi(proxied_response) + + self.app.add_middleware(DefaultRouteMiddleware) + + async def proxy( + self, + request: Request, + path: Optional[str] = None, + source_path: Optional[str] = None, + ): + route_data = self.routes.get(source_path) + if not route_data: + return Response(status_code=404) + + request = await route_data.on_request(request) + try: + proxied_response = await self._send_request( + request, route_data.session, url=f"{route_data.target_url}/{path}" if path else route_data.target_url + ) + proxied_response = await self._convert_httpx_response_to_fastapi(proxied_response) + except Exception as e: + await route_data.on_error(request, e) + raise + return await route_data.on_response(proxied_response, request) + + async def _send_request(self, request, session, url): + if not self.enable_streaming: + proxied_response = await session.request( + method=request.method, + url=url, + headers=dict(request.headers), + content=await request.body(), + params=request.query_params + ) + else: + request = session.build_request( + method=request.method, + url=url, + content=request.stream(), + params=request.query_params, + headers=dict(request.headers), + timeout=httpx.USE_CLIENT_DEFAULT + ) + proxied_response = await session.send( + request=request, + auth=httpx.USE_CLIENT_DEFAULT, + follow_redirects=httpx.USE_CLIENT_DEFAULT, + stream=True, + ) + return proxied_response + + async def _convert_httpx_response_to_fastapi(self, httpx_response): + if self.enable_streaming and httpx_response.headers.get("transfer-encoding", "").lower() == "chunked": + + async def upstream_body_generator(): + async for chunk in httpx_response.aiter_bytes(): + yield chunk + + return StreamingResponse( + upstream_body_generator(), status_code=httpx_response.status_code, headers=dict(httpx_response.headers) + ) + if not self.enable_streaming: + content = httpx_response.content + else: + content = await httpx_response.aread() + fastapi_response = Response( + content=content, + status_code=httpx_response.status_code, + media_type=httpx_response.headers.get("content-type", None), + headers=dict(httpx_response.headers), + ) + # should delete content-length when not present in the original response + # relevant for: + # https://datatracker.ietf.org/doc/html/rfc9112#body.content-length:~:text=MUST%20NOT%20send%20a%20Content%2DLength%20header + if httpx_response.headers.get("content-length") is None: + try: + del fastapi_response.headers["content-length"] # no pop available + except Exception: + pass + return fastapi_response + + def add_route( + self, + source, + target, + request_callback=None, + response_callback=None, + error_callback=None, + endpoint_telemetry=True, + ): + if not self._in_subprocess: + self.message_queue.put( + { + "method": "add_route", + "kwargs": { + "source": source, + "target": target, + "request_callback": request_callback, + "response_callback": response_callback, + "error_callback": error_callback, + "endpoint_telemetry": endpoint_telemetry, + }, + } + ) + return + should_add_route = False + if source not in self.routes: + should_add_route = True + else: + self.routes[source].stop_endpoint_telemetry() + self.routes[source] = Route( + target, + request_callback=request_callback, + response_callback=response_callback, + error_callback=error_callback, + session=httpx.AsyncClient(timeout=None), + ) + if endpoint_telemetry is True: + endpoint_telemetry = {} + if endpoint_telemetry is not False: + self.routes[source].set_endpoint_telemetry_args(**endpoint_telemetry) + if self._in_subprocess: + self.routes[source].start_endpoint_telemetry() + if should_add_route: + self.app.add_api_route( + source, + functools.partial( + self.proxy, + source_path=source, + ), + methods=self.ALL_REST_METHODS, + ) + self.app.add_api_route( + source.rstrip("/") + "/{path:path}", + functools.partial( + self.proxy, + source_path=source, + ), + methods=self.ALL_REST_METHODS, + ) + return self.routes[source] + + def remove_route(self, source): + if not self._in_subprocess: + self.message_queue.put({"method": "remove_route", "kwargs": {"source": source}}) + return + route = self.routes.get(source) + if route: + route.stop_endpoint_telemetry() + if source in self.routes: + # we are not popping the key to prevent calling self.app.add_api_route multiple times + # when self.add_route is called on the same source_path after removal + self.routes[source] = None + + def _start(self): + self._in_subprocess = True + self.app = FastAPI() + if self._default_target: + self._default_session = httpx.AsyncClient(timeout=None) + self._create_default_route() + for route in self.routes.values(): + route.start_endpoint_telemetry() + threading.Thread(target=self._rpc_manager, daemon=True).start() + uvicorn.run( + self.app, + port=self.port, + host="0.0.0.0", + workers=self.workers, + log_level=self.log_level, + access_log=self.access_log, + ) + + def _rpc_manager(self): + while True: + message = self.message_queue.get() + if message["method"] == "add_route": + self.add_route(**message["kwargs"]) + elif message["method"] == "remove_route": + self.remove_route(**message["kwargs"]) + + def start(self): + self.uvicorn_subprocess = Process(target=self._start) + self.uvicorn_subprocess.start() + + def stop(self): + if self.uvicorn_subprocess: + self.uvicorn_subprocess.terminate() + self.uvicorn_subprocess = None diff --git a/clearml/router/proxy.py b/clearml/router/proxy.py new file mode 100644 index 00000000..93e07524 --- /dev/null +++ b/clearml/router/proxy.py @@ -0,0 +1,53 @@ +from .fastapi_proxy import FastAPIProxy + + +class HttpProxy: + DEFAULT_PORT = 9000 + + def __init__( + self, port=None, workers=None, default_target=None, log_level=None, access_log=True, enable_streaming=True + ): + # at the moment, only a fastapi proxy is supported + self.base_proxy = FastAPIProxy( + port or self.DEFAULT_PORT, + workers=workers, + default_target=default_target, + log_level=log_level, + access_log=access_log, + enable_streaming=enable_streaming, + ) + self.base_proxy.start() + self.port = port + self.routes = {} + + def add_route( + self, + source, + target, + request_callback=None, + response_callback=None, + endpoint_telemetry=True, + error_callback=None, + ): + self.routes[source] = self.base_proxy.add_route( + source=source, + target=target, + request_callback=request_callback, + response_callback=response_callback, + endpoint_telemetry=endpoint_telemetry, + error_callback=error_callback, + ) + return self.routes[source] + + def remove_route(self, source): + self.routes.pop(source, None) + self.base_proxy.remove_route(source) + + def get_routes(self): + return self.routes + + def start(self): + self.base_proxy.start() + + def stop(self): + self.base_proxy.stop() diff --git a/clearml/router/route.py b/clearml/router/route.py new file mode 100644 index 00000000..7a962302 --- /dev/null +++ b/clearml/router/route.py @@ -0,0 +1,93 @@ +import inspect +from .endpoint_telemetry import EndpointTelemetry + + +class Route: + def __init__(self, target_url, request_callback=None, response_callback=None, session=None, error_callback=None): + self.target_url = target_url + self.request_callback = request_callback + self.response_callback = response_callback + self.error_callback = error_callback + self.session = session + self.persistent_state = {} + self._endpoint_telemetry = None + self._endpoint_telemetry_args = None + + def set_endpoint_telemetry_args( + self, + endpoint_name="endpoint", + model_name="model", + model=None, + model_url=None, + model_source=None, + model_version=None, + app_id=None, + app_instance=None, + tags=None, + system_tags=None, + container_id=None, + input_size=None, + input_type="str", + report_statistics=True, + endpoint_url=None, + preprocess_artifact=None, + force_register=False + ): + self._endpoint_telemetry_args = dict( + endpoint_name=endpoint_name, + model_name=model_name, + model=model, + model_url=model_url, + model_source=model_source, + model_version=model_version, + app_id=app_id, + app_instance=app_instance, + tags=tags, + system_tags=system_tags, + container_id=container_id, + input_size=input_size, + input_type=input_type, + report_statistics=report_statistics, + endpoint_url=endpoint_url, + preprocess_artifact=preprocess_artifact, + force_register=force_register + ) + + def start_endpoint_telemetry(self): + if self._endpoint_telemetry is not None or self._endpoint_telemetry_args is None: + return + self._endpoint_telemetry = EndpointTelemetry(**self._endpoint_telemetry_args) + + def stop_endpoint_telemetry(self): + if self._endpoint_telemetry is None: + return + self._endpoint_telemetry.stop() + self._endpoint_telemetry = None + + async def on_request(self, request): + new_request = request + if self.request_callback: + new_request = self.request_callback(request, persistent_state=self.persistent_state) or request + if inspect.isawaitable(new_request): + new_request = (await new_request) or request + if self._endpoint_telemetry: + self._endpoint_telemetry.on_request() + return new_request + + async def on_response(self, response, request): + new_response = response + if self.response_callback: + new_response = self.response_callback(response, request, persistent_state=self.persistent_state) or response + if inspect.isawaitable(new_response): + new_response = (await new_response) or response + if self._endpoint_telemetry: + self._endpoint_telemetry.on_response() + return new_response + + async def on_error(self, request, error): + on_error_result = None + if self.error_callback: + on_error_result = self.error_callback(request, error, persistent_state=self.persistent_state) + if inspect.isawaitable(on_error_result): + await on_error_result + return on_error_result diff --git a/clearml/router/router.py b/clearml/router/router.py new file mode 100644 index 00000000..7f3a9117 --- /dev/null +++ b/clearml/router/router.py @@ -0,0 +1,220 @@ +from typing import Optional, Callable, Dict, Union # noqa +from fastapi import Request, Response # noqa +from .proxy import HttpProxy + + +class HttpRouter: + """ + A router class to manage HTTP routing for an application. + Allows the creation, deployment, and management of local and external endpoints, + as well as the configuration of a local proxy for traffic routing. + + Example usage: + + .. code-block:: py + def request_callback(request, persistent_state): + persistent_state["last_request_time"] = time.time() + + def response_callback(response, request, persistent_state): + print("Latency:", time.time() - persistent_state["last_request_time"]) + if urllib.parse.urlparse(str(request.url).rstrip("/")).path == "/modify": + new_content = response.body.replace(b"modify", b"modified") + headers = copy.deepcopy(response.headers) + headers["Content-Length"] = str(len(new_content)) + return Response(status_code=response.status_code, headers=headers, content=new_content) + + router = Task.current_task().get_http_router() + router.set_local_proxy_parameters(incoming_port=9000) + router.create_local_route( + source="/", + target="http://localhost:8000", + request_callback=request_callback, + response_callback=response_callback, + endpoint_telemetry={"model": "MyModel"} + ) + router.deploy(wait=True) + """ + + _instance = None + + def __init__(self, task): + """ + Do not use directly. Use `Task.get_router` instead + """ + self._task = task + self._external_endpoint_port = None + self._proxy = None + self._proxy_params = {"port": HttpProxy.DEFAULT_PORT, "access_log": True} + + def set_local_proxy_parameters( + self, incoming_port=None, default_target=None, log_level=None, access_log=True, enable_streaming=True + ): + # type: (Optional[int], Optional[str], Optional[str], bool) -> () + """ + Set the parameters with which the local proxy is initialized + + :param incoming_port: The incoming port of the proxy + :param default_target: If None, no default target is set. Otherwise, route all traffic + that doesn't match a local route created via `create_local_route` to this target + :param log_level: Python log level for the proxy, one of: + 'critical', 'error', 'warning', 'info', 'debug', 'trace' + :param access_log: Enable/Disable access log + :param enable_streaming: If True, enable streaming of responses with the `transfer-encoding` header set. + If False, no response will be streamed + """ + self._proxy_params["port"] = incoming_port or HttpProxy.DEFAULT_PORT + self._proxy_params["default_target"] = default_target + self._proxy_params["log_level"] = log_level + self._proxy_params["access_log"] = access_log + self._proxy_params["enable_streaming"] = enable_streaming + + def start_local_proxy(self): + """ + Start the local proxy without deploying the router, i.e. requesting an external endpoint + """ + self._proxy = self._proxy or HttpProxy(**self._proxy_params) + + def create_local_route( + self, + source, # type: str + target, # type: str + request_callback=None, # type: Callable[Request, Dict] + response_callback=None, # type: Callable[Response, Request, Dict] + endpoint_telemetry=True, # type: Union[bool, Dict] + error_callback=None, # type: Callable[Request, Exception, Dict] + ): + """ + Create a local route from a source to a target through a proxy. If no proxy instance + exists, one is automatically created. + This function enables routing traffic between the source and target endpoints, optionally + allowing custom callbacks to handle requests and responses or to gather telemetry data + at the endpoint. + To customize proxy parameters, use the `Router.set_local_proxy_parameters` method. + By default, the proxy binds to port 9000 for incoming requests. + + :param source: The source path for routing the traffic. For example, `/` will intercept + all the traffic sent to the proxy, while `/example` will only intercept the calls + that have `/example` as the path prefix. + :param target: The target URL where the intercepted traffic is routed. + :param request_callback: A function used to process each request before it is forwarded to the target. + The callback must have the following parameters: + - request - The intercepted FastAPI request + - persistent_state - A dictionary meant to be used as a caching utility object. + Shared with `response_callback` and `error_callback` + The callback can return a FastAPI Request, in which case this request will be forwarded to the target + :param response_callback: A function used to process each response before it is returned by the proxy. + The callback must have the following parameters: + - response - The FastAPI response + - request - The FastAPI request (after being preprocessed by the proxy) + - persistent_state - A dictionary meant to be used as a caching utility object. + Shared with `request_callback` and `error_callback` + The callback can return a FastAPI Response, in which case this response will be forwarded + :param endpoint_telemetry: If True, enable endpoint telemetry. If False, disable it. + If a dictionary is passed, enable endpoint telemetry with custom parameters. + The parameters are: + - endpoint_url - URL to the endpoint, mandatory if no external URL has been requested + - endpoint_name - name of the endpoint + - model_name - name of the model served by the endpoint + - model - referenced model + - model_url - URL to the model + - model_source - Source of the model + - model_version - Model version + - app_id - App ID, if used inside a ClearML app + - app_instance - The ID of the instance the ClearML app is running + - tags - ClearML tags + - system_tags - ClearML system tags + - container_id - Container ID, should be unique + - input_size - input size of the model + - input_type - input type expected by the model/endpoint + - report_statistics - whether or not to report statistics + :param error_callback: Callback to be called on request error. + The callback must have the following parameters: + - request - the FastAPI request which caused the error + - error - an exception which indicates which error occurred + - persistent_state - A dictionary meant to be used as a caching utility object. + Shared with `request_callback` and `response_callback` + """ + self.start_local_proxy() + self._proxy.add_route( + source, + target, + request_callback=request_callback, + response_callback=response_callback, + endpoint_telemetry=endpoint_telemetry, + error_callback=error_callback, + ) + + def remove_local_route(self, source): + # type: (str) -> () + """ + Remove a local route. If endpoint telemetry is enabled for that route, disable it + + :param source: Remove route based on the source path used to route the traffic + """ + if self._proxy: + self._proxy.remove_route(source) + + def deploy(self, wait=False, wait_interval_seconds=3.0, wait_timeout_seconds=90.0): + # type: (Optional[int], str, bool, float, float) -> Optional[Dict] + """ + Start the local HTTP proxy and request an external endpoint for an application + + :param port: Port the application is listening to. If no port is supplied, a local proxy + will be created. To control the proxy parameters, use `Router.set_local_proxy_parameters`. + To control create local routes through the proxy, use `Router.create_local_route`. + By default, the incoming port bound by the proxy is 9000 + :param protocol: As of now, only `http` is supported + :param wait: If True, wait for the endpoint to be assigned + :param wait_interval_seconds: The poll frequency when waiting for the endpoint + :param wait_timeout_seconds: If this timeout is exceeded while waiting for the endpoint, + the method will no longer wait and None will be returned + + :return: If wait is False, this method will return None. + If no endpoint could be found while waiting, this mehtod returns None. + Otherwise, it returns a dictionary containing the following values: + - endpoint - raw endpoint. One might need to authenticate in order to use this endpoint + - browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser + - port - the port exposed by the application + - protocol - the protocol used by the endpo"int + """ + self._proxy = self._proxy or HttpProxy(**self._proxy_params) + return self._task.request_external_endpoint( + port=self._proxy.port, + protocol="http", + wait=wait, + wait_interval_seconds=wait_interval_seconds, + wait_timeout_seconds=wait_timeout_seconds, + ) + + def wait_for_external_endpoint(self, wait_interval_seconds=3.0, wait_timeout_seconds=90.0): + # type: (float) -> Optional[Dict] + """ + Wait for an external endpoint to be assigned + + :param wait_interval_seconds: The poll frequency when waiting for the endpoint + :param wait_timeout_seconds: If this timeout is exceeded while waiting for the endpoint, + the method will no longer wait + + :return: If no endpoint could be found while waiting, this mehtod returns None. + Otherwise, it returns a dictionary containing the following values: + - endpoint - raw endpoint. One might need to authenticate in order to use this endpoint + - browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser + - port - the port exposed by the application + - protocol - the protocol used by the endpoint + """ + return self._task.wait_for_external_endpoint( + protocol="http", wait_interval_seconds=wait_interval_seconds, wait_timeout_seconds=wait_timeout_seconds + ) + + def list_external_endpoints(self): + # type: () -> List[Dict] + """ + List all external endpoints assigned + + :return: A list of dictionaries. Each dictionary contains the following values: + - endpoint - raw endpoint. One might need to authenticate in order to use this endpoint + - browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser + - port - the port exposed by the application + - protocol - the protocol used by the endpoint + """ + return self._task.list_external_endpoints(protocol="http") diff --git a/clearml/storage/cache.py b/clearml/storage/cache.py index ec5f319d..2ac92436 100644 --- a/clearml/storage/cache.py +++ b/clearml/storage/cache.py @@ -20,9 +20,7 @@ from ..utilities.files import get_filename_max_length class CacheManager(object): __cache_managers = {} - _default_cache_file_limit = deferred_config( - "storage.cache.default_cache_manager_size", 100 - ) + _default_cache_file_limit = deferred_config("storage.cache.default_cache_manager_size", 100) _storage_manager_folder = "storage_manager" _default_context = "global" _local_to_remote_url_lookup = OrderedDict() @@ -48,9 +46,7 @@ class CacheManager(object): self._file_limit = max(self._file_limit, int(cache_file_limit)) return self._file_limit - def get_local_copy( - self, remote_url, force_download, skip_zero_size_check=False - ): + def get_local_copy(self, remote_url, force_download, skip_zero_size_check=False): # type: (str, bool, bool) -> Optional[str] helper = StorageHelper.get(remote_url) @@ -64,9 +60,7 @@ class CacheManager(object): # noinspection PyProtectedMember direct_access = helper.get_driver_direct_access(remote_url) except (OSError, ValueError): - LoggerRoot.get_base_logger().debug( - "Failed accessing local file: {}".format(remote_url) - ) + LoggerRoot.get_base_logger().debug("Failed accessing local file: {}".format(remote_url)) return None if direct_access: @@ -77,6 +71,8 @@ class CacheManager(object): if cached_size is not None and not force_download: CacheManager._add_remote_url(remote_url, cached_file) return cached_file + + self.clean_cache() # we need to download the file: downloaded_file = helper.download_to_file( remote_url, @@ -131,12 +127,12 @@ class CacheManager(object): file_ext = "".join(Path(file_name).suffixes[-2:]) file_ext = file_ext.rstrip(" ") - file_basename = file_name[:-len(file_ext)] + file_basename = file_name[: -len(file_ext)] file_basename = file_basename.strip() # Omit characters from extensionss if len(file_ext) > allowed_length: - file_ext = file_ext[-(allowed_length - 1):] + file_ext = file_ext[-(allowed_length - 1) :] file_ext = "." + file_ext.lstrip(".") # Updating maximum character length @@ -159,9 +155,7 @@ class CacheManager(object): """ :return: full path to current contexts cache folder """ - folder = Path( - get_cache_dir() / CacheManager._storage_manager_folder / self._context - ) + folder = Path(get_cache_dir() / CacheManager._storage_manager_folder / self._context) return folder.as_posix() def get_cache_file(self, remote_url=None, local_filename=None): @@ -171,32 +165,7 @@ class CacheManager(object): :param local_filename: if local_file is given, search for the local file/directory in the cache folder :return: full path to file name, current file size or None """ - - def safe_time(x): - # noinspection PyBroadException - try: - return x.stat().st_mtime - except Exception: - return 0 - - def sort_max_access_time(x): - atime = safe_time(x) - # noinspection PyBroadException - try: - if x.is_dir(): - dir_files = list(x.iterdir()) - atime = ( - max(atime, max(safe_time(s) for s in dir_files)) - if dir_files - else atime - ) - except Exception: - pass - return atime - - folder = Path( - get_cache_dir() / CacheManager._storage_manager_folder / self._context - ) + folder = Path(get_cache_dir() / CacheManager._storage_manager_folder / self._context) folder.mkdir(parents=True, exist_ok=True) local_filename = local_filename or self.get_hashed_url_file(remote_url) local_filename = self._conform_filename(local_filename) @@ -215,23 +184,45 @@ class CacheManager(object): except Exception: new_file_size = None + return new_file.as_posix(), new_file_size + + def clean_cache(self): + # type: () -> bool + """ + If cache is full, clean it by deleting old/lock files + + :return: True if the cache has been cleaned and False otherwise + """ + def safe_time(x): + # noinspection PyBroadException + try: + return x.stat().st_mtime + except Exception: + return 0 + + def sort_max_access_time(x): + atime = safe_time(x) + # noinspection PyBroadException + try: + if x.is_dir(): + dir_files = list(x.iterdir()) + atime = max(atime, max(safe_time(s) for s in dir_files)) if dir_files else atime + except Exception: + pass + return atime + + folder = Path(get_cache_dir() / CacheManager._storage_manager_folder / self._context) folder_files = list(folder.iterdir()) if len(folder_files) <= self._file_limit: - return new_file.as_posix(), new_file_size + return False # first exclude lock files lock_files = dict() files = [] for f in sorted(folder_files, reverse=True, key=sort_max_access_time): - if f.name.startswith(CacheManager._lockfile_prefix) and f.name.endswith( - CacheManager._lockfile_suffix - ): + if f.name.startswith(CacheManager._lockfile_prefix) and f.name.endswith(CacheManager._lockfile_suffix): # parse the lock filename - name = f.name[ - len(CacheManager._lockfile_prefix):-len( - CacheManager._lockfile_suffix - ) - ] + name = f.name[len(CacheManager._lockfile_prefix) : -len(CacheManager._lockfile_suffix)] num, _, name = name.partition(".") lock_files[name] = lock_files.get(name, []) + [f.as_posix()] else: @@ -242,7 +233,7 @@ class CacheManager(object): lock_files.pop(f.name, None) # delete old files - files = files[self._file_limit:] + files = files[self._file_limit :] for f in files: # check if the file is in the lock folder list: folder_lock = self._folder_locks.get(f.absolute().as_posix()) @@ -279,9 +270,7 @@ class CacheManager(object): shutil.rmtree(f.as_posix(), ignore_errors=False) except Exception as e: # failed deleting folder - LoggerRoot.get_base_logger().debug( - "Exception {}\nFailed deleting folder {}".format(e, f) - ) + LoggerRoot.get_base_logger().debug("Exception {}\nFailed deleting folder {}".format(e, f)) # cleanup old lock files for lock_files in lock_files.values(): @@ -291,8 +280,7 @@ class CacheManager(object): os.unlink(f) except BaseException: pass - - return new_file.as_posix(), new_file_size + return True def lock_cache_folder(self, local_path): # type: (Union[str, Path]) -> () @@ -382,9 +370,7 @@ class CacheManager(object): except Exception: return local_copy_path - return CacheManager._local_to_remote_url_lookup.get( - hash(conform_local_copy_path), local_copy_path - ) + return CacheManager._local_to_remote_url_lookup.get(hash(conform_local_copy_path), local_copy_path) @staticmethod def _add_remote_url(remote_url, local_copy_path): @@ -411,10 +397,7 @@ class CacheManager(object): pass CacheManager._local_to_remote_url_lookup[hash(local_copy_path)] = remote_url # protect against overuse, so we do not blowup the memory - if ( - len(CacheManager._local_to_remote_url_lookup) - > CacheManager.__local_to_remote_url_lookup_max_size - ): + if len(CacheManager._local_to_remote_url_lookup) > CacheManager.__local_to_remote_url_lookup_max_size: # pop the first item (FIFO) CacheManager._local_to_remote_url_lookup.popitem(last=False) @@ -429,6 +412,4 @@ class CacheManager(object): # type: (Optional[str]) -> str if not context: return cls._default_context_folder_template - return cls._context_to_folder_lookup.get( - str(context), cls._default_context_folder_template - ) + return cls._context_to_folder_lookup.get(str(context), cls._default_context_folder_template) diff --git a/clearml/task.py b/clearml/task.py index fede03d5..215ffea4 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -113,6 +113,7 @@ if TYPE_CHECKING: import pandas import numpy from PIL import Image + from .router.router import HttpRouter # Forward declaration to help linters TaskInstance = TypeVar("TaskInstance", bound="Task") @@ -185,6 +186,11 @@ class Task(_Task): _launch_multi_node_section = "launch_multi_node" _launch_multi_node_instance_tag = "multi_node_instance" + _external_endpoint_port_map = {"http": "_PORT", "tcp": "external_tcp_port"} + _external_endpoint_address_map = {"http": "_ADDRESS", "tcp": "external_address"} + _external_endpoint_service_map = {"http": "EXTERNAL", "tcp": "EXTERNAL_TCP"} + _external_endpoint_internal_port_map = {"http": "_PORT", "tcp": "upstream_task_port"} + class _ConnectedParametersType(object): argparse = "argument_parser" dictionary = "dictionary" @@ -218,6 +224,8 @@ class Task(_Task): self._resource_monitor = None self._calling_filename = None self._remote_functions_generated = {} + self._external_endpoint_ports = {} + self._http_router = None # register atexit, so that we mark the task as stopped self._at_exit_called = False @@ -842,6 +850,49 @@ class Task(_Task): task._set_startup_info() return task + def get_http_router(self): + # type: () -> HttpRouter + """ + Retrieve an instance of `HttpRouter` to manage an external HTTP endpoint and intercept traffic. + The `HttpRouter` serves as a traffic manager, enabling the creation and configuration of local and external + routesto redirect, monitor, or manipulate HTTP requests and responses. It is designed to handle routing + needs such via a proxy setup which handles request/response interception and telemetry reporting for + applications that require HTTP endpoint management. + + Example usage: + + .. code-block:: py + def request_callback(request, persistent_state): + persistent_state["last_request_time"] = time.time() + + def response_callback(response, request, persistent_state): + print("Latency:", time.time() - persistent_state["last_request_time"]) + if urllib.parse.urlparse(str(request.url).rstrip("/")).path == "/modify": + new_content = response.body.replace(b"modify", b"modified") + headers = copy.deepcopy(response.headers) + headers["Content-Length"] = str(len(new_content)) + return Response(status_code=response.status_code, headers=headers, content=new_content) + + router = Task.current_task().get_http_router() + router.set_local_proxy_parameters(incoming_port=9000) + router.create_local_route( + source="/", + target="http://localhost:8000", + request_callback=request_callback, + response_callback=response_callback, + endpoint_telemetry={"model": "MyModel"} + ) + router.deploy(wait=True) + """ + try: + from .router.router import HttpRouter # noqa + except ImportError: + raise UsageError("Could not import `HttpRouter`. Please run `pip install clearml[router]`") + + if self._http_router is None: + self._http_router = HttpRouter(self) + return self._http_router + def request_external_endpoint( self, port, protocol="http", wait=False, wait_interval_seconds=3.0, wait_timeout_seconds=90.0 ): @@ -850,14 +901,14 @@ class Task(_Task): Request an external endpoint for an application :param port: Port the application is listening to - :param protocol: As of now, only `http` is supported + :param protocol: `http` or `tcp` :param wait: If True, wait for the endpoint to be assigned :param wait_interval_seconds: The poll frequency when waiting for the endpoint :param wait_timeout_seconds: If this timeout is exceeded while waiting for the endpoint, the method will no longer wait and None will be returned :return: If wait is False, this method will return None. - If no endpoint could be found while waiting, this mehtod returns None. + If no endpoint could be found while waiting, this method returns None. Otherwise, it returns a dictionary containing the following values: - endpoint - raw endpoint. One might need to authenticate in order to use this endpoint - browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser @@ -865,75 +916,151 @@ class Task(_Task): - protocol - the protocol used by the endpoint """ Session.verify_feature_set("advanced") - if not getattr(self, "_external_endpoint_port", None): + if protocol not in self._external_endpoint_port_map.keys(): + raise ValueError("Invalid protocol: {}".format(protocol)) + + # sync with router - get data from Task + if not self._external_endpoint_ports.get(protocol): self.reload() - assigned_port = self._get_runtime_properties().get("_PORT") - if assigned_port: - self._external_endpoint_port = assigned_port - if getattr(self, "_external_endpoint_port", None): - if self._external_endpoint_port != port: # noqa - raise ValueError( - "Only one endpoint can be requested at the moment. Port already exposed is: {}".format( - self._external_endpoint_port - ) + internal_port = self._get_runtime_properties().get(self._external_endpoint_internal_port_map[protocol]) + if internal_port: + self._external_endpoint_ports[protocol] = internal_port + + # check if we are trying to change the port - currently not allowed + if self._external_endpoint_ports.get(protocol): + if self._external_endpoint_ports.get(protocol) == port: + # we already set this endpoint, so do nothing + return + + raise ValueError( + "Only one endpoint per protocol can be requested at the moment. Port already exposed is: {}".format( + self._external_endpoint_ports.get(protocol) ) - return + ) + + # mark for the router our request # noinspection PyProtectedMember self._set_runtime_properties( - {"_SERVICE": "EXTERNAL", "_ADDRESS": get_private_ip(), "_PORT": port} + { + "_SERVICE": self._external_endpoint_service_map[protocol], + self._external_endpoint_address_map[protocol]: get_private_ip(), + self._external_endpoint_port_map[protocol]: port, + } ) + # required system_tag for the router to catch the routing request self.set_system_tags((self.get_system_tags() or []) + ["external_service"]) - self._external_endpoint_port = port + self._external_endpoint_ports[protocol] = port if wait: - return self.wait_for_external_endpoint(wait_interval_seconds=wait_interval_seconds) + return self.wait_for_external_endpoint( + wait_interval_seconds=wait_interval_seconds, + wait_timeout_seconds=wait_timeout_seconds, + protocol=protocol + ) return None - def wait_for_external_endpoint(self, wait_interval_seconds=3.0, wait_timeout_seconds=90.0): - # type: (float) -> Optional[Dict] + def wait_for_external_endpoint(self, wait_interval_seconds=3.0, wait_timeout_seconds=90.0, protocol="http"): + # type: (float, float, Optional[str]) -> Union[Optional[Dict], List[Optional[Dict]]] """ Wait for an external endpoint to be assigned :param wait_interval_seconds: The poll frequency when waiting for the endpoint :param wait_timeout_seconds: If this timeout is exceeded while waiting for the endpoint, the method will no longer wait + :param protocol: `http` or `tcp`. Wait for an endpoint to be assigned based on the protocol. + If None, wait for all supported protocols - :return: If no endpoint could be found while waiting, this mehtod returns None. - Otherwise, it returns a dictionary containing the following values: + :return: If no endpoint could be found while waiting, this method returns None. + If a protocol has been specified, it returns a dictionary containing the following values: - endpoint - raw endpoint. One might need to authenticate in order to use this endpoint - browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser - port - the port exposed by the application - protocol - the protocol used by the endpoint + If not protocol is specified, it returns a list of dictionaries containing the values above, + for each protocol requested and waited """ Session.verify_feature_set("advanced") - if not getattr(self, "_external_endpoint_port", None): - LoggerRoot.get_base_logger().warning("No external endpoints have been requested") + if protocol: + return self._wait_for_external_endpoint( + wait_interval_seconds=wait_interval_seconds, + wait_timeout_seconds=wait_timeout_seconds, + protocol=protocol, + warn=True + ) + results = [] + protocols = ["http", "tcp"] + waited_protocols = [] + for protocol_ in protocols: + start_time = time.time() + result = self._wait_for_external_endpoint( + wait_interval_seconds=wait_interval_seconds, + wait_timeout_seconds=wait_timeout_seconds, + protocol=protocol_, + warn=False, + ) + elapsed = time.time() - start_time + if result: + results.append(result) + wait_timeout_seconds -= elapsed + if wait_timeout_seconds > 0 or result: + waited_protocols.append(protocol_) + unwaited_protocols = [p for p in protocols if p not in waited_protocols] + if wait_timeout_seconds <= 0 and unwaited_protocols: + LoggerRoot.get_base_logger().warning( + "Timeout exceeded while waiting for {} endpoint(s)".format(",".join(unwaited_protocols)) + ) + return results + + def _wait_for_external_endpoint( + self, wait_interval_seconds=3.0, wait_timeout_seconds=90.0, protocol="http", warn=True + ): + if not self._external_endpoint_ports.get(protocol): + self.reload() + internal_port = self._get_runtime_properties().get(self._external_endpoint_internal_port_map[protocol]) + if internal_port: + self._external_endpoint_ports[protocol] = internal_port + if not self._external_endpoint_ports.get(protocol): + if warn: + LoggerRoot.get_base_logger().warning("No external {} endpoints have been requested".format(protocol)) return None start_time = time.time() while True: self.reload() - # noinspection PyProtectedMember runtime_props = self._get_runtime_properties() - endpoint = runtime_props.get("endpoint") - browser_endpoint = runtime_props.get("browser_endpoint") - if not getattr(self, "_external_endpoint_port", None): - self._external_endpoint_port = runtime_props.get("_PORT") + endpoint, browser_endpoint = None, None + if protocol == "http": + endpoint = runtime_props.get("endpoint") + browser_endpoint = runtime_props.get("browser_endpoint") + elif protocol == "tcp": + health_check = runtime_props.get("upstream_task_port") + if health_check: + endpoint = ( + runtime_props.get(self._external_endpoint_address_map[protocol]) + + ":" + + str(runtime_props.get(self._external_endpoint_port_map[protocol])) + ) if endpoint or browser_endpoint: return { "endpoint": endpoint, "browser_endpoint": browser_endpoint, - "port": self._external_endpoint_port, - "protocol": "http", + "port": self._external_endpoint_ports[protocol], + "protocol": protocol, } if time.time() >= start_time + wait_timeout_seconds: - LoggerRoot.get_base_logger().warning("Timeout exceeded while waiting for endpoint") + if warn: + LoggerRoot.get_base_logger().warning( + "Timeout exceeded while waiting for {} endpoint".format(protocol) + ) return None time.sleep(wait_interval_seconds) - def list_external_endpoints(self): - # type: () -> List[Dict] + def list_external_endpoints(self, protocol=None): + # type: (Optional[str]) -> List[Dict] """ List all external endpoints assigned + :param protocol: If None, list all external endpoints. Otherwise, only list endpoints + that use this protocol + :return: A list of dictionaries. Each dictionary contains the following values: - endpoint - raw endpoint. One might need to authenticate in order to use this endpoint - browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser @@ -941,23 +1068,37 @@ class Task(_Task): - protocol - the protocol used by the endpoint """ Session.verify_feature_set("advanced") - if not getattr(self, "_external_endpoint_port", None): - self.reload() - self._external_endpoint_port = self._get_runtime_properties().get("_PORT") - if not getattr(self, "_external_endpoint_port", None): - LoggerRoot.get_base_logger().warning("No external endpoints have been requested") - return [] runtime_props = self._get_runtime_properties() - endpoint = runtime_props.get("endpoint") - browser_endpoint = runtime_props.get("browser_endpoint") - return [ - { - "endpoint": endpoint, - "browser_endpoint": browser_endpoint, - "port": self._external_endpoint_port, - "protocol": "http", - } - ] + results = [] + protocols = [protocol] if protocol is not None else ["http", "tcp"] + for protocol in protocols: + internal_port = runtime_props.get(self._external_endpoint_internal_port_map[protocol]) + if internal_port: + self._external_endpoint_ports[protocol] = internal_port + else: + continue + endpoint, browser_endpoint = None, None + if protocol == "http": + endpoint = runtime_props.get("endpoint") + browser_endpoint = runtime_props.get("browser_endpoint") + elif protocol == "tcp": + health_check = runtime_props.get("upstream_task_port") + if health_check: + endpoint = ( + runtime_props.get(self._external_endpoint_address_map[protocol]) + + ":" + + str(runtime_props.get(self._external_endpoint_port_map[protocol])) + ) + if endpoint or browser_endpoint: + results.append( + { + "endpoint": endpoint, + "browser_endpoint": browser_endpoint, + "port": internal_port, + "protocol": protocol, + } + ) + return results @classmethod def create( @@ -2267,6 +2408,26 @@ class Task(_Task): # mark task as stopped self.stopped(force=force, status_message=str(status_message) if status_message else None) + def mark_stop_request(self, force=False, status_message=None): + # type: (bool, Optional[str]) -> () + """ + Request a task to stop. this will not change the task status + but mark a request for an agent or SDK to actually stop the Task. + This will trigger the Task's abort callback, and at the end will + change the task status to stopped and kill the Task's processes + + Notice: calling this on your own Task, will cause + the watchdog to call the on_abort callback and kill the process + + :param bool force: If not True, call fails if the task status is not 'in_progress' + :param str status_message: Optional, add status change message to the stop request. + This message will be stored as status_message on the Task's info panel + """ + # flush any outstanding logs + self.flush(wait_for_uploads=True) + # request task stop + return self.stop_request(self, force=force, status_message=status_message) + def flush(self, wait_for_uploads=False): # type: (bool) -> bool """ @@ -2523,7 +2684,7 @@ class Task(_Task): - PIL.Image - whatever extensions PIL supports (default ``.png``) - In case the ``serialization_function`` argument is set - any extension is supported - :param Callable[Any, Union[bytes, bytearray]] serialization_function: A serialization function that takes one + :param serialization_function: A serialization function that takes one parameter of any type which is the object to be serialized. The function should return a `bytes` or `bytearray` object, which represents the serialized object. Note that the object will be immediately serialized using this function, thus other serialization methods will not be used diff --git a/clearml/utilities/process/mp.py b/clearml/utilities/process/mp.py index 4b9b831c..12150f3c 100644 --- a/clearml/utilities/process/mp.py +++ b/clearml/utilities/process/mp.py @@ -281,12 +281,16 @@ class SafeQueue(object): # Fix the python Queue and Use SimpleQueue write so it uses a single OS write, # making it atomic message passing self._q = SimpleQueue(*args, **kwargs) - # noinspection PyBroadException - try: - # noinspection PyUnresolvedReferences,PyProtectedMember - self._q._writer._send_bytes = partial(SafeQueue._pipe_override_send_bytes, self._q._writer) - except Exception: - pass + + # on Windows, queue communication is done via pipes, no need to override the _send_bytes method + if sys.platform != 'win32': + # noinspection PyBroadException + try: + # noinspection PyUnresolvedReferences,PyProtectedMember + self._q._writer._send_bytes = partial(SafeQueue._pipe_override_send_bytes, self._q._writer) + except Exception: + pass + self._internal_q = None # Note we should Never! assign a new object to `self._q_size`, just work with the initial object self._q_size = [] # list of PIDs we pushed, so this is atomic. diff --git a/clearml/version.py b/clearml/version.py index 9e1406d5..c7bbe496 100644 --- a/clearml/version.py +++ b/clearml/version.py @@ -1 +1 @@ -__version__ = "1.16.5" +__version__ = "1.17.1" diff --git a/examples/router/http_router.py b/examples/router/http_router.py new file mode 100644 index 00000000..c3ba3689 --- /dev/null +++ b/examples/router/http_router.py @@ -0,0 +1,42 @@ +""" +Example on how to use the ClearML HTTP router. +For this example, you would first need a webserver to route the traffic to: +`simple_webserver.py` launches such a server. Running the script will start a +webserver, bound to localhost:8000. + +Then, when running this example, it creates a router which binds to 0.0.0.0:9000. +A local route is then created, which will proxy all traffic from +`http://:9000/example_source` to `http://localhost:8000/serve`. + +Traffic can be intercepted both on request and response via callbacks. See +`request_callback` and `response_callback`. + +By default, the route traffic is monitored and telemetry is sent to the ClearML +server. To disable this, pass `endpoint_telemetry=False` when creating the route +""" + +import time +from clearml import Task + + +def request_callback(request, persistent_state): + persistent_state["last_request_time"] = time.time() + + +def response_callback(response, request, persistent_state): + print("Latency:", time.time() - persistent_state["last_request_time"]) + + +if __name__ == "__main__": + task = Task.init(project_name="Router Example", task_name="Router Example") + router = task.get_http_router() + router.set_local_proxy_parameters(incoming_port=9000, default_target="http://localhost:8000") + router.create_local_route( + source="/example_source", + target="http://localhost:8000/serve", # route traffic to this address + request_callback=request_callback, # intercept requests + response_callback=response_callback, # intercept responses + endpoint_telemetry={"model": "MyModel"} # set this to False to disable telemetry + ) + router.deploy(wait=True) + # run `curl http://localhost:9000/example_source/1` diff --git a/examples/router/requirements.txt b/examples/router/requirements.txt new file mode 100644 index 00000000..7792a35a --- /dev/null +++ b/examples/router/requirements.txt @@ -0,0 +1,2 @@ +clearml +clearml[router] diff --git a/examples/router/simple_webserver.py b/examples/router/simple_webserver.py new file mode 100644 index 00000000..851b87a4 --- /dev/null +++ b/examples/router/simple_webserver.py @@ -0,0 +1,44 @@ +""" +A simple webserver, used as a tool to showcase the capabilities of +ClearML HTTP router. See `http_router.py` for more details. +""" + + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel +import uvicorn + +app = FastAPI() + +actions = { + "1": {"name": "Action 1", "description": "This is model action 1"}, + "2": {"name": "Action 2", "description": "This is model action 2"}, + "3": {"name": "Action 3", "description": "This is model action 3"}, +} + + +class Item(BaseModel): + name: str + description: str + + +@app.get("/") +def read_root(): + return {"message": "Welcome to the FastAPI application!"} + + +@app.get("/serve/{action}", response_model=Item) +def read_item(action: str): + if action in actions: + return actions[action] + else: + raise HTTPException(status_code=404, detail="Item not found") + + +if __name__ == "__main__": + uvicorn.run( + "simple_webserver:app", + host="127.0.0.1", + port=8000, + reload=True + ) diff --git a/requirements.txt b/requirements.txt index 68a298c2..b7e29016 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,7 +11,7 @@ Pillow>=4.1.1 psutil>=3.4.2 pyparsing>=2.0.3 python-dateutil>=2.6.1 -pyjwt>=2.4.0,<2.9.0 ; python_version > '3.5' +pyjwt>=2.4.0,<2.10.0 ; python_version > '3.5' pyjwt>=1.6.4,<2.0.0 ; python_version <= '3.5' PyYAML>=3.12 referencing<0.40 ; python_version >= '3.8' diff --git a/setup.py b/setup.py index 79a3ce7e..fb9aff84 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ """ -ClearML - Artificial Intelligence Version Control -https://github.com/allegroai/clearml +ClearML Inc +https://github.com/clearml/clearml """ import os.path @@ -21,8 +21,7 @@ long_description = read_text(os.path.join(here, 'README.md')) # fix github, dark logo hack. long_description = long_description.replace( """Clear|MLClear|ML""", # noqa - """""", # noqa - 1 + """""", # noqa ) @@ -46,7 +45,7 @@ setup( long_description=long_description, long_description_content_type='text/markdown', # The project's main homepage. - url='https://github.com/allegroai/clearml', + url='https://github.com/clearml/clearml', author='ClearML', author_email='support@clear.ml', license='Apache License 2.0', @@ -71,6 +70,7 @@ setup( 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', 'Programming Language :: Python :: 3.12', + 'Programming Language :: Python :: 3.13', 'License :: OSI Approved :: Apache Software License', ], keywords='clearml trains development machine deep learning version control machine-learning machinelearning ' @@ -87,6 +87,11 @@ setup( 'gs': [ 'google-cloud-storage>=1.13.2', ], + 'router': [ + 'fastapi>=0.115.2', + 'uvicorn>=0.31.1', + 'httpx>=0.27.2' + ] }, package_data={ 'clearml': ['config/default/*.conf', 'backend_api/config/default/*.conf']