This commit is contained in:
revital
2024-05-29 09:31:10 +03:00
36 changed files with 2095 additions and 580 deletions

View File

@@ -10,6 +10,9 @@
[![GitHub license](https://img.shields.io/github/license/allegroai/clearml.svg)](https://img.shields.io/github/license/allegroai/clearml.svg) [![PyPI pyversions](https://img.shields.io/pypi/pyversions/clearml.svg)](https://img.shields.io/pypi/pyversions/clearml.svg) [![PyPI version shields.io](https://img.shields.io/pypi/v/clearml.svg)](https://pypi.org/project/clearml/) [![Conda version shields.io](https://img.shields.io/conda/v/clearml/clearml)](https://anaconda.org/clearml/clearml) [![Optuna](https://img.shields.io/badge/Optuna-integrated-blue)](https://optuna.org)<br>
[![PyPI Downloads](https://static.pepy.tech/badge/clearml/month)](https://pypi.org/project/clearml/) [![Artifact Hub](https://img.shields.io/endpoint?url=https://artifacthub.io/badge/repository/allegroai)](https://artifacthub.io/packages/search?repo=allegroai) [![Youtube](https://img.shields.io/badge/ClearML-DD0000?logo=youtube&logoColor=white)](https://www.youtube.com/c/clearml) [![Slack Channel](https://img.shields.io/badge/slack-%23clearml--community-blueviolet?logo=slack)](https://joinslack.clear.ml) [![Signup](https://img.shields.io/badge/Clear%7CML-Signup-brightgreen)](https://app.clear.ml)
`🌟 ClearML is open-source - Leave a star to support the project! 🌟`
</div>
---
@@ -27,7 +30,8 @@ ClearML is a ML/DL development and production suite. It contains FIVE main modul
- Includes optimized GPU serving support backed by Nvidia-Triton
- **with out-of-the-box Model Monitoring**
- [Reports](https://clear.ml/docs/latest/docs/webapp/webapp_reports) - Create and share rich MarkDown documents supporting embeddable online content
- **NEW** :fire: [Orchestration Dashboard](https://clear.ml/docs/latest/docs/webapp/webapp_orchestration_dash/) - Live rich dashboard for your entire compute cluster (Cloud / Kubernetes / On-Prem)
- :fire: [Orchestration Dashboard](https://clear.ml/docs/latest/docs/webapp/webapp_orchestration_dash/) - Live rich dashboard for your entire compute cluster (Cloud / Kubernetes / On-Prem)
- **NEW** 💥 [Fractional GPUs](https://github.com/allegroai/clearml-fractional-gpu) - Container based, driver level GPU memory limitation 🙀 !!!
Instrumenting these components is the **ClearML-server**, see [Self-Hosting](https://clear.ml/docs/latest/docs/deploying_clearml/clearml_server) & [Free tier Hosting](https://app.clear.ml)

View File

@@ -198,6 +198,13 @@ class AutoScaler(object):
instance_type=resource_conf["instance_type"],
)
def is_worker_still_idle(self, worker_id):
self.logger.info("Checking if worker %r is still idle", worker_id)
for worker in self.api_client.workers.get_all():
if worker.id == worker_id:
return getattr(worker, 'task', None) is None
return True
def supervisor(self):
"""
Spin up or down resources as necessary.
@@ -323,6 +330,9 @@ class AutoScaler(object):
continue
# Remove from both cloud and clearml all instances that are idle for longer than MAX_IDLE_TIME_MIN
if time() - timestamp > self.max_idle_time_min * MINUTE:
if not self.is_worker_still_idle(worker_id):
# Skip worker if no more idle
continue
wid = WorkerId(worker_id)
cloud_id = wid.cloud_id
self.driver.spin_down_worker(cloud_id)

View File

@@ -75,34 +75,70 @@ class PipelineController(object):
@attrs
class Node(object):
name = attrib(type=str) # pipeline step name
base_task_id = attrib(type=str, default=None) # base Task ID to be cloned and launched
task_factory_func = attrib(type=Callable, default=None) # alternative to base_task_id, function creating a Task
queue = attrib(type=str, default=None) # execution queue name to use
parents = attrib(type=list, default=None) # list of parent DAG steps
timeout = attrib(type=float, default=None) # execution timeout limit
parameters = attrib(type=dict, default=None) # Task hyper-parameters to change
configurations = attrib(type=dict, default=None) # Task configuration objects to change
task_overrides = attrib(type=dict, default=None) # Task overrides to change
executed = attrib(type=str, default=None) # The actual executed Task ID (None if not executed yet)
status = attrib(type=str, default="pending") # The Node Task status (cached, aborted, etc.)
clone_task = attrib(type=bool, default=True) # If True cline the base_task_id, then execute the cloned Task
job = attrib(type=ClearmlJob, default=None) # ClearMLJob object
job_type = attrib(type=str, default=None) # task type (string)
job_started = attrib(type=float, default=None) # job startup timestamp (epoch ts in seconds)
job_ended = attrib(type=float, default=None) # job startup timestamp (epoch ts in seconds)
job_code_section = attrib(type=str, default=None) # pipeline code configuration section name
skip_job = attrib(type=bool, default=False) # if True, this step should be skipped
continue_on_fail = attrib(type=bool, default=False) # if True, the pipeline continues even if the step failed
cache_executed_step = attrib(type=bool, default=False) # if True this pipeline step should be cached
return_artifacts = attrib(type=list, default=None) # List of artifact names returned by the step
monitor_metrics = attrib(type=list, default=None) # List of metric title/series to monitor
monitor_artifacts = attrib(type=list, default=None) # List of artifact names to monitor
monitor_models = attrib(type=list, default=None) # List of models to monitor
explicit_docker_image = attrib(type=str, default=None) # The Docker image the node uses, specified at creation
recursively_parse_parameters = attrib(type=bool, default=False) # if True, recursively parse parameters in
# lists, dicts, or tuples
output_uri = attrib(type=Union[bool, str], default=None) # The default location for output models and other artifacts
# pipeline step name
name = attrib(type=str)
# base Task ID to be cloned and launched
base_task_id = attrib(type=str, default=None)
# alternative to base_task_id, function creating a Task
task_factory_func = attrib(type=Callable, default=None)
# execution queue name to use
queue = attrib(type=str, default=None)
# list of parent DAG steps
parents = attrib(type=list, default=None)
# execution timeout limit
timeout = attrib(type=float, default=None)
# Task hyper-parameters to change
parameters = attrib(type=dict, default=None)
# Task configuration objects to change
configurations = attrib(type=dict, default=None)
# Task overrides to change
task_overrides = attrib(type=dict, default=None)
# The actual executed Task ID (None if not executed yet)
executed = attrib(type=str, default=None)
# The Node Task status (cached, aborted, etc.)
status = attrib(type=str, default="pending")
# If True cline the base_task_id, then execute the cloned Task
clone_task = attrib(type=bool, default=True)
# ClearMLJob object
job = attrib(type=ClearmlJob, default=None)
# task type (string)
job_type = attrib(type=str, default=None)
# job startup timestamp (epoch ts in seconds)
job_started = attrib(type=float, default=None)
# job startup timestamp (epoch ts in seconds)
job_ended = attrib(type=float, default=None)
# pipeline code configuration section name
job_code_section = attrib(type=str, default=None)
# if True, this step should be skipped
skip_job = attrib(type=bool, default=False)
# if True this pipeline step should be cached
cache_executed_step = attrib(type=bool, default=False)
# List of artifact names returned by the step
return_artifacts = attrib(type=list, default=None)
# List of metric title/series to monitor
monitor_metrics = attrib(type=list, default=None)
# List of artifact names to monitor
monitor_artifacts = attrib(type=list, default=None)
# List of models to monitor
monitor_models = attrib(type=list, default=None)
# The Docker image the node uses, specified at creation
explicit_docker_image = attrib(type=str, default=None)
# if True, recursively parse parameters in lists, dicts, or tuples
recursively_parse_parameters = attrib(type=bool, default=False)
# The default location for output models and other artifacts
output_uri = attrib(type=Union[bool, str], default=None)
# Specify whether to create the Task as a draft
draft = attrib(type=bool, default=False)
# continue_behaviour dict, for private use. used to initialize fields related to continuation behaviour
continue_behaviour = attrib(type=dict, default=None)
# if True, the pipeline continues even if the step failed
continue_on_fail = attrib(type=bool, default=False)
# if True, the pipeline continues even if the step was aborted
continue_on_abort = attrib(type=bool, default=False)
# if True, the children of aborted steps are skipped
skip_children_on_abort = attrib(type=bool, default=True)
# if True, the children of failed steps are skipped
skip_children_on_fail = attrib(type=bool, default=True)
def __attrs_post_init__(self):
if self.parents is None:
@@ -121,6 +157,12 @@ class PipelineController(object):
self.monitor_artifacts = []
if self.monitor_models is None:
self.monitor_models = []
if self.continue_behaviour is not None:
self.continue_on_fail = self.continue_behaviour.get("continue_on_fail", True)
self.continue_on_abort = self.continue_behaviour.get("continue_on_abort", True)
self.skip_children_on_fail = self.continue_behaviour.get("skip_children_on_fail", True)
self.skip_children_on_abort = self.continue_behaviour.get("skip_children_on_abort", True)
self.continue_behaviour = None
def copy(self):
# type: () -> PipelineController.Node
@@ -178,7 +220,8 @@ class PipelineController(object):
artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]]
artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
output_uri=None, # type: Optional[Union[str, bool]]
skip_global_imports=False # type: bool
skip_global_imports=False, # type: bool
working_dir=None # type: Optional[str]
):
# type: (...) -> None
"""
@@ -271,6 +314,7 @@ class PipelineController(object):
:param skip_global_imports: If True, global imports will not be included in the steps' execution when creating
the steps from a functions, otherwise all global imports will be automatically imported in a safe manner at
the beginning of each steps execution. Default is False
:param working_dir: Working directory to launch the pipeline from.
"""
if auto_version_bump is not None:
warnings.warn("PipelineController.auto_version_bump is deprecated. It will be ignored", DeprecationWarning)
@@ -353,7 +397,7 @@ class PipelineController(object):
docker_image=docker, docker_arguments=docker_args, docker_setup_bash_script=docker_bash_setup_script
)
self._task.set_packages(packages)
self._task.set_repo(repo, branch=repo_branch, commit=repo_commit)
self._task.set_script(repository=repo, branch=repo_branch, commit=repo_commit, working_dir=working_dir)
self._auto_connect_task = bool(self._task)
# make sure we add to the main Task the pipeline tag
if self._task and not self._pipeline_as_sub_project:
@@ -421,7 +465,8 @@ class PipelineController(object):
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
recursively_parse_parameters=False, # type: bool
output_uri=None # type: Optional[Union[str, bool]]
output_uri=None, # type: Optional[Union[str, bool]]
continue_behaviour=None # type: Optional[dict]
):
# type: (...) -> bool
"""
@@ -491,9 +536,10 @@ class PipelineController(object):
use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step.
:param clone_base_task: If True (default), the pipeline will clone the base task, and modify/enqueue
the cloned Task. If False, the base-task is used directly, notice it has to be in draft-mode (created).
:param continue_on_fail: (default False). If True, failed step will not cause the pipeline to stop
:param continue_on_fail: (Deprecated, use `continue_behaviour` instead).
If True, failed step will not cause the pipeline to stop
(or marked as failed). Notice, that steps that are connected (or indirectly connected)
to the failed step will be skipped.
to the failed step will be skipped. Defaults to False
:param pre_execute_callback: Callback function, called when the step (Task) is created
and before it is sent for execution. Allows a user to modify the Task before launch.
Use `node.job` to access the ClearmlJob object, or `node.job.task` to directly access the Task object.
@@ -566,9 +612,24 @@ class PipelineController(object):
:param output_uri: The storage / output url for this step. This is the default location for output
models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
:param continue_behaviour: Controls whether the pipeline will continue running after a
step failed/was aborted. Different behaviours can be set using a dictionary of boolean options. Supported options are:
- continue_on_fail - If True, the pipeline will continue even if the step failed.
If False, the pipeline will stop
- continue_on_abort - If True, the pipeline will continue even if the step was aborted.
If False, the pipeline will stop
- skip_children_on_fail - If True, the children of this step will be skipped if it failed.
If False, the children will run even if this step failed.
Any parameters passed from the failed step to its children will default to None
- skip_children_on_abort - If True, the children of this step will be skipped if it was aborted.
If False, the children will run even if this step was aborted.
Any parameters passed from the failed step to its children will default to None
If the keys are not present in the dictionary, their values will default to True
:return: True if successful
"""
if continue_on_fail:
warnings.warn("`continue_on_fail` is deprecated. Use `continue_behaviour` instead", DeprecationWarning)
# always store callback functions (even when running remotely)
if pre_execute_callback:
self._pre_step_callbacks[name] = pre_execute_callback
@@ -624,7 +685,8 @@ class PipelineController(object):
monitor_metrics=monitor_metrics or [],
monitor_artifacts=monitor_artifacts or [],
monitor_models=monitor_models or [],
output_uri=self._output_uri if output_uri is None else output_uri
output_uri=self._output_uri if output_uri is None else output_uri,
continue_behaviour=continue_behaviour
)
self._retries[name] = 0
self._retries_callbacks[name] = retry_on_failure if callable(retry_on_failure) else \
@@ -670,7 +732,10 @@ class PipelineController(object):
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
tags=None, # type: Optional[Union[str, Sequence[str]]]
output_uri=None # type: Optional[Union[str, bool]]
output_uri=None, # type: Optional[Union[str, bool]]
draft=False, # type: Optional[bool]
working_dir=None, # type: Optional[str]
continue_behaviour=None # type: Optional[dict]
):
# type: (...) -> bool
"""
@@ -764,9 +829,10 @@ class PipelineController(object):
Example: ['model_weights_*', ]
:param time_limit: Default None, no time limit.
Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.
:param continue_on_fail: (default False). If True, failed step will not cause the pipeline to stop
:param continue_on_fail: (Deprecated, use `continue_behaviour` instead).
If True, failed step will not cause the pipeline to stop
(or marked as failed). Notice, that steps that are connected (or indirectly connected)
to the failed step will be skipped.
to the failed step will be skipped. Defaults to False
:param pre_execute_callback: Callback function, called when the step (Task) is created
and before it is sent for execution. Allows a user to modify the Task before launch.
Use `node.job` to access the ClearmlJob object, or `node.job.task` to directly access the Task object.
@@ -839,9 +905,27 @@ class PipelineController(object):
(i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
:param output_uri: The storage / output url for this step. This is the default location for output
models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
:param draft: (default False). If True, the Task will be created as a draft task.
:param working_dir: Working directory to launch the script from.
:param continue_behaviour: Controls whether the pipeline will continue running after a
step failed/was aborted. Different behaviours can be set using a dictionary of boolean options. Supported options are:
- continue_on_fail - If True, the pipeline will continue even if the step failed.
If False, the pipeline will stop
- continue_on_abort - If True, the pipeline will continue even if the step was aborted.
If False, the pipeline will stop
- skip_children_on_fail - If True, the children of this step will be skipped if it failed.
If False, the children will run even if this step failed.
Any parameters passed from the failed step to its children will default to None
- skip_children_on_abort - If True, the children of this step will be skipped if it was aborted.
If False, the children will run even if this step was aborted.
Any parameters passed from the failed step to its children will default to None
If the keys are not present in the dictionary, their values will default to True
:return: True if successful
"""
if continue_on_fail:
warnings.warn("`continue_on_fail` is deprecated. Use `continue_behaviour` instead", DeprecationWarning)
function_kwargs = function_kwargs or {}
default_kwargs = inspect.getfullargspec(function)
if default_kwargs and default_kwargs.args and default_kwargs.defaults:
@@ -879,7 +963,10 @@ class PipelineController(object):
retry_on_failure=retry_on_failure,
status_change_callback=status_change_callback,
tags=tags,
output_uri=output_uri
output_uri=output_uri,
draft=draft,
working_dir=working_dir,
continue_behaviour=continue_behaviour
)
def start(
@@ -1505,10 +1592,26 @@ class PipelineController(object):
self._task.add_tags(tags)
def _create_task_from_function(
self, docker, docker_args, docker_bash_setup_script,
function, function_input_artifacts, function_kwargs, function_return,
auto_connect_frameworks, auto_connect_arg_parser,
packages, project_name, task_name, task_type, repo, branch, commit, helper_functions, output_uri=None
self,
docker,
docker_args,
docker_bash_setup_script,
function,
function_input_artifacts,
function_kwargs,
function_return,
auto_connect_frameworks,
auto_connect_arg_parser,
packages,
project_name,
task_name,
task_type,
repo,
branch,
commit,
helper_functions,
output_uri=None,
working_dir=None
):
task_definition = CreateFromFunction.create_task_from_function(
a_function=function,
@@ -1533,7 +1636,8 @@ class PipelineController(object):
task_template_header=self._task_template_header,
artifact_serialization_function=self._artifact_serialization_function,
artifact_deserialization_function=self._artifact_deserialization_function,
skip_global_imports=self._skip_global_imports
skip_global_imports=self._skip_global_imports,
working_dir=working_dir
)
return task_definition
@@ -1981,7 +2085,10 @@ class PipelineController(object):
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
tags=None, # type: Optional[Union[str, Sequence[str]]]
output_uri=None # type: Optional[Union[str, bool]]
output_uri=None, # type: Optional[Union[str, bool]]
draft=False, # type: Optional[bool]
working_dir=None, # type: Optional[str]
continue_behaviour=None # type: Optional[dict]
):
# type: (...) -> bool
"""
@@ -2075,9 +2182,10 @@ class PipelineController(object):
Example: ['model_weights_*', ]
:param time_limit: Default None, no time limit.
Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.
:param continue_on_fail: (default False). If True, failed step will not cause the pipeline to stop
:param continue_on_fail: (Deprecated, use `continue_behaviour` instead).
If True, failed step will not cause the pipeline to stop
(or marked as failed). Notice, that steps that are connected (or indirectly connected)
to the failed step will be skipped.
to the failed step will be skipped. Defaults to False
:param pre_execute_callback: Callback function, called when the step (Task) is created,
and before it is sent for execution. Allows a user to modify the Task before launch.
Use `node.job` to access the ClearmlJob object, or `node.job.task` to directly access the Task object.
@@ -2150,6 +2258,21 @@ class PipelineController(object):
(i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
:param output_uri: The storage / output url for this step. This is the default location for output
models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
:param draft: (default False). If True, the Task will be created as a draft task.
:param working_dir: Working directory to launch the step from.
:param continue_behaviour: Controls whether the pipeline will continue running after a
step failed/was aborted. Different behaviours can be set using a dictionary of boolean options. Supported options are:
- continue_on_fail - If True, the pipeline will continue even if the step failed.
If False, the pipeline will stop
- continue_on_abort - If True, the pipeline will continue even if the step was aborted.
If False, the pipeline will stop
- skip_children_on_fail - If True, the children of this step will be skipped if it failed.
If False, the children will run even if this step failed.
Any parameters passed from the failed step to its children will default to None
- skip_children_on_abort - If True, the children of this step will be skipped if it was aborted.
If False, the children will run even if this step was aborted.
Any parameters passed from the failed step to its children will default to None
If the keys are not present in the dictionary, their values will default to True
:return: True if successful
"""
@@ -2200,21 +2323,51 @@ class PipelineController(object):
project_name = project_name or self._get_target_project() or self._task.get_project_name()
task_definition = self._create_task_from_function(
docker, docker_args, docker_bash_setup_script, function,
function_input_artifacts, function_kwargs, function_return,
auto_connect_frameworks, auto_connect_arg_parser,
packages, project_name, task_name,
task_type, repo, repo_branch, repo_commit, helper_functions, output_uri=output_uri)
docker,
docker_args,
docker_bash_setup_script,
function,
function_input_artifacts,
function_kwargs,
function_return,
auto_connect_frameworks,
auto_connect_arg_parser,
packages,
project_name,
task_name,
task_type,
repo,
repo_branch,
repo_commit,
helper_functions,
output_uri=output_uri,
working_dir=working_dir,
)
elif self._task.running_locally() or self._task.get_configuration_object(name=name) is None:
project_name = project_name or self._get_target_project() or self._task.get_project_name()
task_definition = self._create_task_from_function(
docker, docker_args, docker_bash_setup_script, function,
function_input_artifacts, function_kwargs, function_return,
auto_connect_frameworks, auto_connect_arg_parser,
packages, project_name, task_name,
task_type, repo, repo_branch, repo_commit, helper_functions, output_uri=output_uri)
docker,
docker_args,
docker_bash_setup_script,
function,
function_input_artifacts,
function_kwargs,
function_return,
auto_connect_frameworks,
auto_connect_arg_parser,
packages,
project_name,
task_name,
task_type,
repo,
repo_branch,
repo_commit,
helper_functions,
output_uri=output_uri,
working_dir=working_dir,
)
# update configuration with the task definitions
# noinspection PyProtectedMember
self._task._set_configuration(
@@ -2258,7 +2411,9 @@ class PipelineController(object):
monitor_models=monitor_models,
job_code_section=job_code_section,
explicit_docker_image=docker,
output_uri=output_uri
output_uri=output_uri,
draft=draft,
continue_behaviour=continue_behaviour
)
self._retries[name] = 0
self._retries_callbacks[name] = retry_on_failure if callable(retry_on_failure) else \
@@ -2373,6 +2528,8 @@ class PipelineController(object):
if task_factory_func_task:
task_factory_func_task.delete(raise_on_error=False)
self._running_nodes.append(node.name)
elif node.draft:
self._running_nodes.append(node.name)
else:
self._running_nodes.append(node.name)
@@ -2738,7 +2895,13 @@ class PipelineController(object):
self._final_failure[node.name] = True
completed_jobs.append(j)
node.executed = node.job.task_id() if not (node_failed or node.job.is_aborted()) else False
if node.job.is_aborted():
node.executed = node.job.task_id() if not node.skip_children_on_abort else False
elif node_failed:
node.executed = node.job.task_id() if not node.skip_children_on_fail else False
else:
node.executed = node.job.task_id()
if j in launched_nodes:
launched_nodes.remove(j)
# check if we need to stop all running steps
@@ -3329,7 +3492,8 @@ class PipelineDecorator(PipelineController):
artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]]
artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
output_uri=None, # type: Optional[Union[str, bool]]
skip_global_imports=False # type: bool
skip_global_imports=False, # type: bool
working_dir=None # type: Optional[str]
):
# type: (...) -> ()
"""
@@ -3415,6 +3579,7 @@ class PipelineDecorator(PipelineController):
:param skip_global_imports: If True, global imports will not be included in the steps' execution, otherwise all
global imports will be automatically imported in a safe manner at the beginning of each steps execution.
Default is False
:param working_dir: Working directory to launch the pipeline from.
"""
super(PipelineDecorator, self).__init__(
name=name,
@@ -3437,7 +3602,8 @@ class PipelineDecorator(PipelineController):
artifact_serialization_function=artifact_serialization_function,
artifact_deserialization_function=artifact_deserialization_function,
output_uri=output_uri,
skip_global_imports=skip_global_imports
skip_global_imports=skip_global_imports,
working_dir=working_dir
)
# if we are in eager execution, make sure parent class knows it
@@ -3500,7 +3666,14 @@ class PipelineDecorator(PipelineController):
else:
self._final_failure[node.name] = True
completed_jobs.append(j)
node.executed = node.job.task_id() if not (node_failed or node.job.is_aborted()) else False
if node.job.is_aborted():
node.executed = node.job.task_id() if not node.skip_children_on_abort else False
elif node_failed:
node.executed = node.job.task_id() if not node.skip_children_on_fail else False
else:
node.executed = node.job.task_id()
if j in launched_nodes:
launched_nodes.remove(j)
# check if we need to stop all running steps
@@ -3655,11 +3828,26 @@ class PipelineDecorator(PipelineController):
self._force_task_configuration_update()
def _create_task_from_function(
self, docker, docker_args, docker_bash_setup_script,
function, function_input_artifacts, function_kwargs, function_return,
auto_connect_frameworks, auto_connect_arg_parser,
packages, project_name, task_name, task_type, repo, branch, commit,
helper_functions, output_uri=None
self,
docker,
docker_args,
docker_bash_setup_script,
function,
function_input_artifacts,
function_kwargs,
function_return,
auto_connect_frameworks,
auto_connect_arg_parser,
packages,
project_name,
task_name,
task_type,
repo,
branch,
commit,
helper_functions,
output_uri=None,
working_dir=None
):
def sanitize(function_source):
matched = re.match(r"[\s]*@[\w]*.component[\s\\]*\(", function_source)
@@ -3669,15 +3857,15 @@ class PipelineDecorator(PipelineController):
open_parenthesis = 0
last_index = -1
for i, c in enumerate(function_source):
if not open_parenthesis and c == ')':
if not open_parenthesis and c == ")":
last_index = i
break
elif c == ')':
elif c == ")":
open_parenthesis -= 1
elif c == '(':
elif c == "(":
open_parenthesis += 1
if last_index >= 0:
function_source = function_source[last_index+1:].lstrip()
function_source = function_source[last_index + 1:].lstrip()
return function_source
task_definition = CreateFromFunction.create_task_from_function(
@@ -3704,7 +3892,8 @@ class PipelineDecorator(PipelineController):
_sanitize_function=sanitize,
artifact_serialization_function=self._artifact_serialization_function,
artifact_deserialization_function=self._artifact_deserialization_function,
skip_global_imports=self._skip_global_imports
skip_global_imports=self._skip_global_imports,
working_dir=working_dir,
)
return task_definition
@@ -3738,6 +3927,8 @@ class PipelineDecorator(PipelineController):
def _wait_for_node(cls, node):
pool_period = 5.0 if cls._debug_execute_step_process else 20.0
while True:
if not node.job:
break
node.job.wait(pool_period=pool_period, aborted_nonresponsive_as_running=True)
job_status = str(node.job.status(force=True))
if (
@@ -3781,7 +3972,10 @@ class PipelineDecorator(PipelineController):
post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
tags=None, # type: Optional[Union[str, Sequence[str]]]
output_uri=None # type: Optional[Union[str, bool]]
output_uri=None, # type: Optional[Union[str, bool]]
draft=False, # type: Optional[bool]
working_dir=None, # type: Optional[str]
continue_behaviour=None # type: Optional[dict]
):
# type: (...) -> Callable
"""
@@ -3803,9 +3997,10 @@ class PipelineDecorator(PipelineController):
have been executed successfully.
:param execution_queue: Optional, the queue to use for executing this specific step.
If not provided, the task will be sent to the pipeline's default execution queue
:param continue_on_fail: (default False). If True, a failed step will not cause the pipeline to stop
:param continue_on_fail: (Deprecated, use `continue_behaviour` instead).
If True, failed step will not cause the pipeline to stop
(or marked as failed). Notice, that steps that are connected (or indirectly connected)
to the failed step will be skipped.
to the failed step will be skipped. Defaults to False
:param docker: Specify the docker image to be used when executing the pipeline step remotely
:param docker_args: Add docker execution arguments for the remote execution
(use single string for all docker arguments).
@@ -3921,10 +4116,28 @@ class PipelineDecorator(PipelineController):
(i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
:param output_uri: The storage / output url for this step. This is the default location for output
models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
:param draft: (default False). If True, the Task will be created as a draft task.
:param working_dir: Working directory to launch the step from.
:param continue_behaviour: Controls whether the pipeline will continue running after a
step failed/was aborted. Different behaviours can be set using a dictionary of boolean options. Supported options are:
- continue_on_fail - If True, the pipeline will continue even if the step failed.
If False, the pipeline will stop
- continue_on_abort - If True, the pipeline will continue even if the step was aborted.
If False, the pipeline will stop
- skip_children_on_fail - If True, the children of this step will be skipped if it failed.
If False, the children will run even if this step failed.
Any parameters passed from the failed step to its children will default to None
- skip_children_on_abort - If True, the children of this step will be skipped if it was aborted.
If False, the children will run even if this step was aborted.
Any parameters passed from the failed step to its children will default to None
If the keys are not present in the dictionary, their values will default to True
:return: function wrapper
"""
def decorator_wrap(func):
if continue_on_fail:
warnings.warn("`continue_on_fail` is deprecated. Use `continue_behaviour` instead", DeprecationWarning)
# noinspection PyProtectedMember
unwrapped_func = CreateFromFunction._deep_extract_wrapped(func)
_name = name or str(unwrapped_func.__name__)
@@ -3966,7 +4179,10 @@ class PipelineDecorator(PipelineController):
post_execute_callback=post_execute_callback,
status_change_callback=status_change_callback,
tags=tags,
output_uri=output_uri
output_uri=output_uri,
draft=draft,
working_dir=working_dir,
continue_behaviour=continue_behaviour
)
if cls._singleton:
@@ -4134,8 +4350,13 @@ class PipelineDecorator(PipelineController):
except: # noqa
pass
# skipped job
if not _node.job:
return None
cls._wait_for_node(_node)
if (_node.job.is_failed() and not _node.continue_on_fail) or _node.job.is_aborted():
if (_node.job.is_failed() and not _node.continue_on_fail) or \
(_node.job.is_aborted() and not _node.continue_on_abort):
raise ValueError(
'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id())
)
@@ -4201,7 +4422,8 @@ class PipelineDecorator(PipelineController):
artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]]
artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
output_uri=None, # type: Optional[Union[str, bool]]
skip_global_imports=False # type: bool
skip_global_imports=False, # type: bool
working_dir=None # type: Optional[str]
):
# type: (...) -> Callable
"""
@@ -4318,6 +4540,7 @@ class PipelineDecorator(PipelineController):
:param skip_global_imports: If True, global imports will not be included in the steps' execution, otherwise all
global imports will be automatically imported in a safe manner at the beginning of each steps execution.
Default is False
:param working_dir: Working directory to launch the pipeline from.
"""
def decorator_wrap(func):
@@ -4365,7 +4588,8 @@ class PipelineDecorator(PipelineController):
artifact_serialization_function=artifact_serialization_function,
artifact_deserialization_function=artifact_deserialization_function,
output_uri=output_uri,
skip_global_imports=skip_global_imports
skip_global_imports=skip_global_imports,
working_dir=working_dir
)
ret_val = func(**pipeline_kwargs)
LazyEvalWrapper.trigger_all_remote_references()
@@ -4418,7 +4642,8 @@ class PipelineDecorator(PipelineController):
artifact_serialization_function=artifact_serialization_function,
artifact_deserialization_function=artifact_deserialization_function,
output_uri=output_uri,
skip_global_imports=skip_global_imports
skip_global_imports=skip_global_imports,
working_dir=working_dir
)
a_pipeline._args_map = args_map or {}
@@ -4588,7 +4813,10 @@ class PipelineDecorator(PipelineController):
if not cls._singleton._abort_running_steps_on_failure:
for parent in _node.parents:
if cls._singleton._nodes[parent].status in ["failed", "aborted", "skipped"]:
parent = cls._singleton._nodes[parent]
if parent.status == "failed" and parent.skip_children_on_fail or \
parent.status == "aborted" and parent.skip_children_on_abort or \
parent.status == "skipped":
_node.skip_job = True
return

View File

@@ -18,7 +18,6 @@ from ..backend_interface.util import get_or_create_project, exact_match_regex
from ..storage.util import hash_dict
from ..task import Task
from ..backend_api.services import tasks as tasks_service
from ..utilities.proxy_object import verify_basic_type, get_basic_type
logger = getLogger('clearml.automation.job')
@@ -304,7 +303,7 @@ class BaseJob(object):
def is_completed(self):
# type: () -> bool
"""
Return True, if job is has executed and completed successfully
Return True, if job was executed and completed successfully
:return: True the task is currently in completed or published state
"""
@@ -313,7 +312,7 @@ class BaseJob(object):
def is_aborted(self):
# type: () -> bool
"""
Return True, if job is has executed and aborted
Return True, if job was executed and aborted
:return: True the task is currently in aborted state
"""
@@ -646,12 +645,8 @@ class ClearmlJob(BaseJob):
if tags:
self.task.set_tags(list(set(self.task.get_tags()) | set(tags)))
if task_params:
param_types = {}
for key, value in task_params.items():
if verify_basic_type(value):
param_types[key] = get_basic_type(value)
self.task.set_parameters(task_params, __parameters_types=param_types)
if parameter_override:
self.task.update_parameters(parameter_override)
# store back Task configuration object into backend
if task_configurations:

View File

@@ -2034,7 +2034,10 @@ class HyperParameterOptimizer(object):
if job_id in completed_jobs:
if value != completed_jobs[job_id][0]:
iteration_value = self._objective_metric.get_current_raw_objective(job_id)
iteration = [it_[0] if it_ else -1 for it_ in iteration_value]
if iteration_value:
iteration = [it_[0] if it_ else -1 for it_ in iteration_value]
else:
iteration = [-1]
completed_jobs[job_id] = (
value,
iteration,
@@ -2049,7 +2052,10 @@ class HyperParameterOptimizer(object):
pairs.append((i, value))
labels.append(str(params)[1:-1])
iteration_value = self._objective_metric.get_current_raw_objective(job_id)
iteration = [it_[0] if it_ else -1 for it_ in iteration_value]
if iteration_value:
iteration = [it_[0] if it_ else -1 for it_ in iteration_value]
else:
iteration = [-1]
completed_jobs[job_id] = (
value,
iteration,

View File

@@ -136,6 +136,15 @@ class Metrics(InterfaceBase):
kwargs = {}
if entry:
key, url = ev.get_target_full_upload_uri(storage_uri, self.storage_key_prefix, quote_uri=False)
orig_url = url
try:
storage = self._get_storage(url)
if storage:
url = storage._apply_url_substitutions(url)
except Exception as err:
self._get_logger().warning("Failed applying URL substitutions on {} ({})".format(orig_url, err))
kwargs[entry.key_prop] = key
kwargs[entry.url_prop] = url
if not entry.stream:
@@ -222,7 +231,13 @@ class Metrics(InterfaceBase):
if batched_requests:
if self._offline_mode:
with open(self._offline_log_filename.as_posix(), 'at') as f:
f.write(json.dumps([b.to_dict() for b in batched_requests])+'\n')
requests = []
for b in batched_requests:
request = b.to_dict()
if self._for_model:
request["model_event"] = True
requests.append(request)
f.write(json.dumps(requests)+'\n')
return
req = api_events.AddBatchRequest(requests=batched_requests)
@@ -262,13 +277,16 @@ class Metrics(InterfaceBase):
pass
@classmethod
def report_offline_session(cls, task, folder, iteration_offset=0):
def report_offline_session(cls, task, folder, iteration_offset=0, remote_url=None, only_with_id=None, session=None):
from ... import StorageManager
filename = Path(folder) / cls.__offline_filename
if not filename.is_file():
return False
# noinspection PyProtectedMember
remote_url = task._get_default_report_storage_uri()
if not remote_url:
# noinspection PyProtectedMember
remote_url = task._get_default_report_storage_uri()
if not session:
session = task.session
if remote_url and remote_url.endswith('/'):
remote_url = remote_url[:-1]
uploaded_files = set()
@@ -281,6 +299,8 @@ class Metrics(InterfaceBase):
if not line:
break
list_requests = json.loads(line)
if only_with_id:
list_requests = [r for r in list_requests if r["task"] == only_with_id]
for r in list_requests:
# noinspection PyBroadException
try:
@@ -327,7 +347,7 @@ class Metrics(InterfaceBase):
warning('Failed reporting metric, line {} [{}]'.format(i, ex))
batch_requests = api_events.AddBatchRequest(requests=list_requests)
if batch_requests.requests:
res = task.session.send(batch_requests)
res = session.send(batch_requests)
if res and not res.ok():
warning("failed logging metric task to backend ({:d} lines, {})".format(
len(batch_requests.requests), str(res.meta)))

View File

@@ -42,7 +42,7 @@ class SetupUploadMixin(object):
warnings.warn(
"Warning: 'Task.setup_upload' is deprecated. "
"Use 'setup_aws_upload', 'setup_gcp_upload' or 'setup_azure_upload' instead",
DeprecationWarning
DeprecationWarning,
)
self.setup_aws_upload(
bucket_name,
@@ -100,7 +100,7 @@ class SetupUploadMixin(object):
multipart=multipart,
secure=secure,
verify=verify,
profile=profile
profile=profile,
)
StorageHelper.add_aws_configuration(self._bucket_config, log=self.log)
self.storage_uri = StorageHelper.get_aws_storage_uri_from_config(self._bucket_config)

View File

@@ -325,12 +325,12 @@ class CreateAndPopulate(object):
"+++ b{script_entry}\n" \
"@@ -{idx_a},0 +{idx_b},3 @@\n" \
"+from clearml import Task\n" \
"+Task.init()\n" \
"+(__name__ != \"__main__\") or Task.init()\n" \
"+\n".format(
script_entry=script_entry, idx_a=idx_a, idx_b=idx_a + 1)
elif local_entry_file and lines:
# if we are here it means we do not have a git diff, but a single script file
init_lines = ["from clearml import Task\n", "Task.init()\n\n"]
init_lines = ["from clearml import Task\n", "(__name__ != \"__main__\") or Task.init()\n\n"]
task_state['script']['diff'] = ''.join(lines[:idx_a] + init_lines + lines[idx_a:])
# no need to add anything, we patched it.
task_init_patch = ""
@@ -338,7 +338,7 @@ class CreateAndPopulate(object):
# Add Task.init call
task_init_patch += \
"from clearml import Task\n" \
"Task.init()\n\n"
"(__name__ != \"__main__\") or Task.init()\n\n"
# make sure we add the diff at the end of the current diff
task_state['script']['diff'] = task_state['script'].get('diff', '')
@@ -524,7 +524,7 @@ if __name__ == '__main__':
if artifact_name in parent_task.artifacts:
kwargs[k] = parent_task.artifacts[artifact_name].get(deserialization_function={artifact_deserialization_function_name})
else:
kwargs[k] = parent_task.get_parameters(cast=True)[return_section + '/' + artifact_name]
kwargs[k] = parent_task.get_parameters(cast=True).get(return_section + '/' + artifact_name)
results = {function_name}(**kwargs)
result_names = {function_return}
if result_names:
@@ -574,7 +574,8 @@ if __name__ == '__main__':
artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
_sanitize_function=None, # type: Optional[Callable[[str], str]]
_sanitize_helper_functions=None, # type: Optional[Callable[[str], str]]
skip_global_imports=False # type: bool
skip_global_imports=False, # type: bool
working_dir=None # type: Optional[str]
):
# type: (...) -> Optional[Dict, Task]
"""
@@ -660,6 +661,8 @@ if __name__ == '__main__':
:param skip_global_imports: If True, the global imports will not be fetched from the function's file, otherwise
all global imports will be automatically imported in a safe manner at the beginning of the function's
execution. Default is False
:param working_dir: Optional, Working directory to launch the script from.
:return: Newly created Task object
"""
# not set -> equals True
@@ -774,6 +777,7 @@ if __name__ == '__main__':
docker_bash_setup_script=docker_bash_setup_script,
output_uri=output_uri,
add_task_init_call=False,
working_directory=working_dir
)
entry_point = '{}.py'.format(function_name)
task = populate.create_task(dry_run=dry_run)
@@ -781,7 +785,7 @@ if __name__ == '__main__':
if dry_run:
task['script']['diff'] = task_template
task['script']['entry_point'] = entry_point
task['script']['working_dir'] = '.'
task['script']['working_dir'] = working_dir or '.'
task['hyperparams'] = {
cls.kwargs_section: {
k: dict(section=cls.kwargs_section, name=k,

File diff suppressed because it is too large Load Diff

View File

@@ -1360,13 +1360,19 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
"Delete hyper-parameter is not supported by your clearml-server, "
"upgrade to the latest version")
force_kwargs = {}
if Session.check_min_api_version("2.13"):
force_kwargs["force"] = force
with self._edit_lock:
paramkey = tasks.ParamKey(section=name.split('/', 1)[0], name=name.split('/', 1)[1])
res = self.send(tasks.DeleteHyperParamsRequest(
task=self.id, hyperparams=[paramkey], force=force), raise_on_errors=False)
paramkey = tasks.ParamKey(section=name.split("/", 1)[0], name=name.split("/", 1)[1])
res = self.send(
tasks.DeleteHyperParamsRequest(task=self.id, hyperparams=[paramkey], **force_kwargs),
raise_on_errors=False,
)
self.reload()
return res.ok()
return res.ok() if not self._offline_mode else True
def update_parameters(self, *args, **kwargs):
# type: (*dict, **Any) -> ()
@@ -2267,6 +2273,11 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
Alternatively, you can add all requirements from a file.
Example: Task.add_requirements('/path/to/your/project/requirements.txt')
.. note::
Task.add_requirements does not directly modify the task's requirements. Instead, it improves the accuracy
of capturing a task's Python packages. To explicitly change task requirements, use
Task.set_packages, which overwrites existing packages with the specified ones.
:param str package_name: The package name or path to a requirements file
to add to the "Installed Packages" section of the task.
:param package_version: The package version requirements. If ``None``, then use the installed version.
@@ -2305,13 +2316,14 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
def force_requirements_env_freeze(cls, force=True, requirements_file=None):
# type: (bool, Optional[Union[str, Path]]) -> None
"""
Force using `pip freeze` / `conda list` to store the full requirements of the active environment
(instead of statically analyzing the running code and listing directly imported packages)
Force the use of ``pip freeze`` or ``conda list`` to capture the requirements from the active
environment (instead of statically analyzing the running code and listing directly imported packages).
Notice: Must be called before `Task.init` !
:param force: Set force using `pip freeze` flag on/off
:param requirements_file: Optional pass requirements.txt file to use (instead of `pip freeze` or automatic
analysis)
:param force: If ``True`` (default), force the use of ``pip freeze`` or ``conda list`` to capture the
requirements. If ``False``, ClearML statistically analyzes the code for requirements.
:param requirements_file: (Optional) Pass a requirements.txt file to specify the required packages (instead of
``pip freeze`` or automatic analysis). This will overwrite any existing requirement listing.
"""
cls._force_use_pip_freeze = requirements_file if requirements_file else bool(force)

View File

@@ -464,6 +464,10 @@ class Artifacts(object):
artifact_type_data.content_type = "text/csv"
np.savetxt(local_filename, artifact_object, delimiter=",")
delete_after_upload = True
elif pd and isinstance(artifact_object, pd.DataFrame) \
and (isinstance(artifact_object.index, pd.MultiIndex) or
isinstance(artifact_object.columns, pd.MultiIndex)):
store_as_pickle = True
elif pd and isinstance(artifact_object, pd.DataFrame):
artifact_type = "pandas"
artifact_type_data.preview = preview or str(artifact_object.__repr__())

View File

@@ -276,17 +276,6 @@ class PatchFastaiV2(object):
value=metric.value,
iteration=self.__train_iter,
)
except Exception:
pass
def before_step(self):
# noinspection PyBroadException
try:
if hasattr(fastai.learner.Recorder, "before_step"):
super().before_step() # noqa
if not PatchFastaiV2._current_task:
return
logger = PatchFastaiV2._current_task.get_logger()
gradients = [
x.grad.clone().detach().cpu() for x in self.learn.model.parameters() if x.grad is not None
] # noqa

View File

@@ -1,7 +1,9 @@
import sys
from typing import Any, Callable
import six
import threading
import importlib
from pathlib2 import Path
@@ -108,6 +110,68 @@ class PatchPyTorchModelIO(PatchBaseModelIO):
del PatchPyTorchModelIO._checkpoint_filename[tid]
return ret
@staticmethod
def _patch_lightning_io_internal(lightning_name):
# type: (str) -> None
"""
:param lightning_name: lightning module name, use "lightning" or "pytorch_lightning"
"""
try:
pytorch_lightning = importlib.import_module(lightning_name)
except ImportError:
# lightning is not installed
# Nothing to do
return
if lightning_name == "lightning":
pytorch_lightning = pytorch_lightning.pytorch
def patch_method(cls, method_name, patched_method):
# type: (type, str, Callable[..., Any]) -> None
"""
Patch a method of a class if it exists.
Otherwise, no effect.
"""
try:
method = getattr(cls, method_name)
except AttributeError:
# the method is not defined on the given class
pass
else:
setattr(cls, method_name,
_patched_call(method, patched_method))
patch_method(pytorch_lightning.trainer.Trainer, "save_checkpoint",
PatchPyTorchModelIO._save)
patch_method(pytorch_lightning.trainer.Trainer, "restore",
PatchPyTorchModelIO._load_from_obj)
try:
checkpoint_connector = pytorch_lightning.trainer.connectors.checkpoint_connector
except AttributeError:
# checkpoint_connector does not yet exist; lightning version is < 0.10.0
# Nothing left to do
return
try:
CheckpointConnector = checkpoint_connector._CheckpointConnector
except AttributeError:
# CheckpointConnector has not yet been made protected
# lighting version is < 2.0.0
try:
CheckpointConnector = checkpoint_connector.CheckpointConnector
except AttributeError:
# Unexpected future breaking change in lightning
# No way to automatically handle
return
patch_method(CheckpointConnector, "save_checkpoint",
PatchPyTorchModelIO._save)
patch_method(CheckpointConnector, "restore",
PatchPyTorchModelIO._load_from_obj)
@staticmethod
def _patch_lightning_io():
if PatchPyTorchModelIO.__patched_lightning:
@@ -118,41 +182,7 @@ class PatchPyTorchModelIO(PatchBaseModelIO):
PatchPyTorchModelIO.__patched_lightning = True
# noinspection PyBroadException
try:
import lightning # noqa
lightning.pytorch.trainer.Trainer.save_checkpoint = _patched_call(
lightning.pytorch.trainer.Trainer.save_checkpoint, PatchPyTorchModelIO._save
) # noqa
lightning.pytorch.trainer.Trainer.restore = _patched_call(
lightning.pytorch.trainer.Trainer.restore, PatchPyTorchModelIO._load_from_obj
) # noqa
except ImportError:
pass
except Exception:
pass
# noinspection PyBroadException
try:
import lightning # noqa
# noinspection PyUnresolvedReferences
lightning.pytorch.trainer.connectors.checkpoint_connector.CheckpointConnector.save_checkpoint = _patched_call(
lightning.pytorch.trainer.connectors.checkpoint_connector.CheckpointConnector.save_checkpoint,
PatchPyTorchModelIO._save,
) # noqa
# noinspection PyUnresolvedReferences
lightning.pytorch.trainer.connectors.checkpoint_connector.CheckpointConnector.restore = _patched_call(
lightning.pytorch.trainer.connectors.checkpoint_connector.CheckpointConnector.restore,
PatchPyTorchModelIO._load_from_obj,
) # noqa
except ImportError:
pass
except Exception:
pass
PatchPyTorchModelIO._patch_lightning_io_internal("lightning")
@staticmethod
def _patch_pytorch_lightning_io():
@@ -164,39 +194,7 @@ class PatchPyTorchModelIO(PatchBaseModelIO):
PatchPyTorchModelIO.__patched_pytorch_lightning = True
# noinspection PyBroadException
try:
import pytorch_lightning # noqa
pytorch_lightning.trainer.Trainer.save_checkpoint = _patched_call(
pytorch_lightning.trainer.Trainer.save_checkpoint, PatchPyTorchModelIO._save) # noqa
pytorch_lightning.trainer.Trainer.restore = _patched_call(
pytorch_lightning.trainer.Trainer.restore, PatchPyTorchModelIO._load_from_obj) # noqa
except ImportError:
pass
except Exception:
pass
# noinspection PyBroadException
try:
import pytorch_lightning # noqa
# noinspection PyUnresolvedReferences
pytorch_lightning.trainer.connectors.checkpoint_connector.CheckpointConnector.save_checkpoint = \
_patched_call(
pytorch_lightning.trainer.connectors.checkpoint_connector.CheckpointConnector.save_checkpoint,
PatchPyTorchModelIO._save) # noqa
# noinspection PyUnresolvedReferences
pytorch_lightning.trainer.connectors.checkpoint_connector.CheckpointConnector.restore = \
_patched_call(
pytorch_lightning.trainer.connectors.checkpoint_connector.CheckpointConnector.restore,
PatchPyTorchModelIO._load_from_obj) # noqa
except ImportError:
pass
except Exception:
pass
PatchPyTorchModelIO._patch_lightning_io_internal("pytorch_lightning")
@staticmethod
def _save(original_fn, obj, f, *args, **kwargs):
@@ -334,4 +332,4 @@ class PatchPyTorchModelIO(PatchBaseModelIO):
def __get_cached_checkpoint_filename():
tid = threading.current_thread().ident
checkpoint_filename = PatchPyTorchModelIO._checkpoint_filename.get(tid)
return checkpoint_filename or None
return checkpoint_filename or None

View File

@@ -156,7 +156,7 @@
vcs_repo_detect_async: true
# Store uncommitted git/hg source code diff in experiment manifest when training in development mode
# This stores "git diff" or "hg diff" into the experiment's "script.requirements.diff" section
# This stores "git diff" or into the experiment's "script.requirements.diff" section
store_uncommitted_code_diff: true
# Support stopping an experiment in case it was externally stopped, status was changed or task was reset
@@ -209,7 +209,15 @@
report_global_mem_used: false
# if provided, start resource reporting after this amount of seconds
#report_start_sec: 30
# report_start_sec: 30
# set the initial time (seconds) to wait for iteration reporting to be used as x-axis for the
# resource monitoring, if timeout exceeds then reverts to "seconds from start"
# wait_for_first_iteration_to_start_sec: 30
# set the maximum time (seconds) to allow the resource monitoring to revert back to
# iteration reporting x-axis after starting to report "seconds from start"
# max_wait_for_first_iteration_to_start_sec: 1800
}
}

View File

@@ -28,6 +28,8 @@ SUPPRESS_UPDATE_MESSAGE_ENV_VAR = EnvEntry("CLEARML_SUPPRESS_UPDATE_MESSAGE", "T
MAX_SERIES_PER_METRIC = EnvEntry("CLEARML_MAX_SERIES_PER_METRIC", default=100, type=int)
JUPYTER_PASSWORD = EnvEntry("CLEARML_JUPYTER_PASSWORD")
# Repository detection
VCS_REPO_TYPE = EnvEntry("CLEARML_VCS_REPO_TYPE", "TRAINS_VCS_REPO_TYPE", default="git")
VCS_REPOSITORY_URL = EnvEntry("CLEARML_VCS_REPO_URL", "TRAINS_VCS_REPO_URL")

View File

@@ -1678,6 +1678,10 @@ class Dataset(object):
def get_instance(dataset_id_):
task = Task.get_task(task_id=dataset_id_)
if cls.__tag not in task.get_system_tags():
raise ValueError("Provided id={} is not a Dataset ID".format(task.id))
if task.status == "created":
raise ValueError("Dataset id={} is in draft mode, delete and recreate it".format(task.id))
force_download = False if task.status in ("stopped", "published", "closed", "completed") else True

View File

@@ -14,23 +14,23 @@ from six import BytesIO
default_level = logging.INFO
_levelToName = {
logging.CRITICAL: 'CRITICAL',
logging.ERROR: 'ERROR',
logging.WARNING: 'WARNING',
logging.INFO: 'INFO',
logging.DEBUG: 'DEBUG',
logging.NOTSET: 'NOTSET',
logging.CRITICAL: "CRITICAL",
logging.ERROR: "ERROR",
logging.WARNING: "WARNING",
logging.INFO: "INFO",
logging.DEBUG: "DEBUG",
logging.NOTSET: "NOTSET",
}
_nameToLevel = {
'CRITICAL': logging.CRITICAL,
'FATAL': logging.FATAL,
'ERROR': logging.ERROR,
'WARN': logging.WARNING,
'WARNING': logging.WARNING,
'INFO': logging.INFO,
'DEBUG': logging.DEBUG,
'NOTSET': logging.NOTSET,
"CRITICAL": logging.CRITICAL,
"FATAL": logging.FATAL,
"ERROR": logging.ERROR,
"WARN": logging.WARNING,
"WARNING": logging.WARNING,
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
"NOTSET": logging.NOTSET,
}
@@ -48,7 +48,6 @@ def resolve_logging_level(level):
class PickledLogger(logging.getLoggerClass()):
def __init__(self, *args, **kwargs):
super(PickledLogger, self).__init__(*args, **kwargs)
self._init_kwargs = None
@@ -59,14 +58,17 @@ class PickledLogger(logging.getLoggerClass()):
if sys.version_info.major >= 3 and sys.version_info.minor >= 7:
return a_instance
safe_logger = PickledLogger(name=kwargs.get('name'))
safe_logger = PickledLogger(name=kwargs.get("name"))
safe_logger.__dict__ = a_instance.__dict__
if 'stream' in kwargs and kwargs['stream']:
kwargs['stream'] = 'stdout' if kwargs['stream'] == sys.stdout else (
'stderr' if kwargs['stream'] == sys.stderr else kwargs['stream'])
if "stream" in kwargs and kwargs["stream"]:
kwargs["stream"] = (
"stdout"
if kwargs["stream"] == sys.stdout
else ("stderr" if kwargs["stream"] == sys.stderr else kwargs["stream"])
)
else:
kwargs['stream'] = None
kwargs['_func'] = func
kwargs["stream"] = None
kwargs["_func"] = func
safe_logger._init_kwargs = kwargs
return safe_logger
@@ -74,15 +76,17 @@ class PickledLogger(logging.getLoggerClass()):
return self._init_kwargs or {}
def __setstate__(self, state):
state['stream'] = sys.stdout if state['stream'] == 'stdout' else (
sys.stderr if state['stream'] == 'stderr' else state['stream'])
_func = state.pop('_func') or self.__class__
state["stream"] = (
sys.stdout
if state["stream"] == "stdout"
else (sys.stderr if state["stream"] == "stderr" else state["stream"])
)
_func = state.pop("_func") or self.__class__
self.__dict__ = _func(**state).__dict__
class _LevelRangeFilter(logging.Filter):
def __init__(self, min_level, max_level, name=''):
def __init__(self, min_level, max_level, name=""):
super(_LevelRangeFilter, self).__init__(name)
self.min_level = min_level
self.max_level = max_level
@@ -103,21 +107,18 @@ class LoggerRoot(object):
if level is None and getenv("CLEARML_LOG_LEVEL"):
level = resolve_logging_level(getenv("CLEARML_LOG_LEVEL").strip())
if level is None:
print('Invalid value in environment variable CLEARML_LOG_LEVEL: %s' % getenv("CLEARML_LOG_LEVEL"))
print("Invalid value in environment variable CLEARML_LOG_LEVEL: %s" % getenv("CLEARML_LOG_LEVEL"))
clearml_logger = logging.getLogger('clearml')
clearml_logger = logging.getLogger("clearml")
if level is None:
level = clearml_logger.level
# avoid nested imports
from ..config import get_log_redirect_level
LoggerRoot.__base_logger = PickledLogger.wrapper(
clearml_logger,
func=cls.get_base_logger,
level=level,
stream=stream,
colored=colored
clearml_logger, func=cls.get_base_logger, level=level, stream=stream, colored=colored
)
LoggerRoot.__base_logger.setLevel(level)
@@ -129,9 +130,7 @@ class LoggerRoot(object):
# Adjust redirect level in case requested level is higher (e.g. logger is requested for CRITICAL
# and redirect is set for ERROR, in which case we redirect from CRITICAL)
redirect_level = max(level, redirect_level)
LoggerRoot.__base_logger.addHandler(
ClearmlStreamHandler(redirect_level, sys.stderr, colored)
)
LoggerRoot.__base_logger.addHandler(ClearmlStreamHandler(redirect_level, sys.stderr, colored))
if level < redirect_level:
# Not all levels were redirected, remaining should be sent to requested stream
@@ -139,9 +138,7 @@ class LoggerRoot(object):
handler.addFilter(_LevelRangeFilter(min_level=level, max_level=redirect_level - 1))
LoggerRoot.__base_logger.addHandler(handler)
else:
LoggerRoot.__base_logger.addHandler(
ClearmlStreamHandler(level, stream, colored)
)
LoggerRoot.__base_logger.addHandler(ClearmlStreamHandler(level, stream, colored))
LoggerRoot.__base_logger.propagate = False
return LoggerRoot.__base_logger
@@ -157,28 +154,27 @@ class LoggerRoot(object):
# https://github.com/pytest-dev/pytest/issues/5502#issuecomment-647157873
loggers = [logging.getLogger()] + list(logging.Logger.manager.loggerDict.values())
for logger in loggers:
handlers = getattr(logger, 'handlers', [])
handlers = getattr(logger, "handlers", [])
for handler in handlers:
if isinstance(handler, ClearmlLoggerHandler):
logger.removeHandler(handler)
def add_options(parser):
""" Add logging options to an argparse.ArgumentParser object """
"""Add logging options to an argparse.ArgumentParser object"""
level = logging.getLevelName(default_level)
parser.add_argument(
'--log-level', '-l', default=level, help='Log level (default is %s)' % level)
parser.add_argument("--log-level", "-l", default=level, help="Log level (default is %s)" % level)
def apply_logging_args(args):
""" Apply logging args from an argparse.ArgumentParser parsed args """
"""Apply logging args from an argparse.ArgumentParser parsed args"""
global default_level
default_level = logging.getLevelName(args.log_level.upper())
def get_logger(path=None, level=None, stream=None, colored=False):
""" Get a python logging object named using the provided filename and preconfigured with a color-formatted
stream handler
"""Get a python logging object named using the provided filename and preconfigured with a color-formatted
stream handler
"""
# noinspection PyBroadException
try:
@@ -196,42 +192,42 @@ def get_logger(path=None, level=None, stream=None, colored=False):
ch.setLevel(level)
log.addHandler(ch)
log.propagate = True
return PickledLogger.wrapper(
log, func=get_logger, path=path, level=level, stream=stream, colored=colored)
return PickledLogger.wrapper(log, func=get_logger, path=path, level=level, stream=stream, colored=colored)
def _add_file_handler(logger, log_dir, fh, formatter=None):
""" Adds a file handler to a logger """
"""Adds a file handler to a logger"""
Path(log_dir).mkdir(parents=True, exist_ok=True)
if not formatter:
log_format = '%(asctime)s %(name)s x_x[%(levelname)s] %(message)s'
log_format = "%(asctime)s %(name)s x_x[%(levelname)s] %(message)s"
formatter = logging.Formatter(log_format)
fh.setFormatter(formatter)
logger.addHandler(fh)
def add_rotating_file_handler(logger, log_dir, log_file_prefix, max_bytes=10 * 1024 * 1024, backup_count=20,
formatter=None):
""" Create and add a rotating file handler to a logger """
def add_rotating_file_handler(
logger, log_dir, log_file_prefix, max_bytes=10 * 1024 * 1024, backup_count=20, formatter=None
):
"""Create and add a rotating file handler to a logger"""
fh = ClearmlRotatingFileHandler(
str(Path(log_dir) / ('%s.log' % log_file_prefix)), maxBytes=max_bytes, backupCount=backup_count)
str(Path(log_dir) / ("%s.log" % log_file_prefix)), maxBytes=max_bytes, backupCount=backup_count
)
_add_file_handler(logger, log_dir, fh, formatter)
def add_time_rotating_file_handler(logger, log_dir, log_file_prefix, when='midnight', formatter=None):
def add_time_rotating_file_handler(logger, log_dir, log_file_prefix, when="midnight", formatter=None):
"""
Create and add a time rotating file handler to a logger.
Possible values for when are 'midnight', weekdays ('w0'-'W6', when 0 is Monday), and 's', 'm', 'h' amd 'd' for
seconds, minutes, hours and days respectively (case-insensitive)
Create and add a time rotating file handler to a logger.
Possible values for when are 'midnight', weekdays ('w0'-'W6', when 0 is Monday), and 's', 'm', 'h' amd 'd' for
seconds, minutes, hours and days respectively (case-insensitive)
"""
fh = ClearmlTimedRotatingFileHandler(
str(Path(log_dir) / ('%s.log' % log_file_prefix)), when=when)
fh = ClearmlTimedRotatingFileHandler(str(Path(log_dir) / ("%s.log" % log_file_prefix)), when=when)
_add_file_handler(logger, log_dir, fh, formatter)
def get_null_logger(name=None):
""" Get a logger with a null handler """
log = logging.getLogger(name if name else 'null')
"""Get a logger with a null handler"""
log = logging.getLogger(name if name else "null")
if not log.handlers:
# avoid nested imports
from ..config import config
@@ -242,10 +238,10 @@ def get_null_logger(name=None):
class TqdmLog(object):
""" Tqdm (progressbar) wrapped logging class """
"""Tqdm (progressbar) wrapped logging class"""
class _TqdmIO(BytesIO):
""" IO wrapper class for Tqdm """
"""IO wrapper class for Tqdm"""
def __init__(self, level=20, logger=None, *args, **kwargs):
self._log = logger or get_null_logger()
@@ -253,18 +249,24 @@ class TqdmLog(object):
BytesIO.__init__(self, *args, **kwargs)
def write(self, buf):
self._buf = buf.strip('\r\n\t ')
self._buf = buf.strip("\r\n\t ")
def flush(self):
self._log.log(self._level, self._buf)
def __init__(self, total, desc='', log_level=20, ascii=False, logger=None, smoothing=0, mininterval=5, initial=0):
def __init__(self, total, desc="", log_level=20, ascii=False, logger=None, smoothing=0, mininterval=5, initial=0):
from tqdm import tqdm
self._io = self._TqdmIO(level=log_level, logger=logger)
self._tqdm = tqdm(total=total, desc=desc, file=self._io, ascii=ascii if not system() == 'Windows' else True,
smoothing=smoothing,
mininterval=mininterval, initial=initial)
self._tqdm = tqdm(
total=total,
desc=desc,
file=self._io,
ascii=ascii if not system() == "Windows" else True,
smoothing=smoothing,
mininterval=mininterval,
initial=initial,
)
def update(self, n=None):
if n is not None:

View File

@@ -930,7 +930,8 @@ class Logger(object):
image=image,
iter=iteration or 0,
upload_uri=upload_uri,
max_image_history=max_image_history if max_image_history is not None
max_image_history=max_image_history
if max_image_history is not None
else self._default_max_sample_history,
delete_after_upload=delete_after_upload,
)
@@ -976,13 +977,7 @@ class Logger(object):
- ``False`` - Do not delete
"""
mutually_exclusive(
UsageError,
_check_none=True,
local_path=local_path or None,
url=url or None,
stream=stream
)
mutually_exclusive(UsageError, _check_none=True, local_path=local_path or None, url=url or None, stream=stream)
if stream is not None and not file_extension:
raise ValueError("No file extension provided for stream media upload")

View File

@@ -3,6 +3,7 @@ import os
import zipfile
import shutil
from tempfile import mkstemp
from uuid import uuid4
import six
import math
@@ -1070,6 +1071,10 @@ class BaseModel(object):
except Exception:
pass
def get_offline_mode_folder(self):
from clearml import Task as OfflineTask
return OfflineTask.current_task().get_offline_mode_folder()
def _init_reporter(self):
if self._reporter:
return
@@ -2154,6 +2159,16 @@ class OutputModel(BaseModel):
"""
return self._get_base_model().upload_storage_uri
@property
def id(self):
# type: () -> str
from clearml import Task as OfflineTask
if OfflineTask.is_offline():
if not self._base_model_id:
self._base_model_id = "offline-{}".format(str(uuid4()).replace("-", ""))
return self._base_model_id
return super(OutputModel, self).id
def __init__(
self,
task=None, # type: Optional[Task]
@@ -2810,7 +2825,8 @@ class OutputModel(BaseModel):
update_comment=update_comment,
is_package=is_package
),
output_uri=self._get_base_model().upload_storage_uri or self._default_output_uri
output_uri=self._get_base_model().upload_storage_uri or self._default_output_uri,
id=self.id
)
)
return weights_filename_offline or register_uri

View File

@@ -1,4 +1,5 @@
""" Local and remote storage support """
from .manager import StorageManager
__all__ = ["StorageManager"]

View File

@@ -363,8 +363,9 @@ class StorageManager(object):
overwrite=False,
skip_zero_size_check=False,
silence_errors=False,
max_workers=None
):
# type: (str, Optional[str], Optional[str], bool, bool, bool) -> Optional[str]
# type: (str, Optional[str], Optional[str], bool, bool, bool, Optional[int]) -> Optional[str]
"""
Download remote folder recursively to the local machine, maintaining the sub folder structure
from the remote storage.
@@ -387,6 +388,11 @@ class StorageManager(object):
:param bool skip_zero_size_check: If True, no error will be raised for files with zero bytes size.
:param bool silence_errors: If True, silence errors that might pop up when trying to download
files stored remotely. Default False
:param int max_workers: If value is set to a number,
it will spawn the specified number of worker threads
to download the contents of the folder in parallel. Otherwise, if set to None, it will
internally use as many threads as there are
logical CPU cores in the system (this is default Python behavior). Default None
:return: Target local folder
"""
@@ -405,7 +411,7 @@ class StorageManager(object):
helper = StorageHelper.get(remote_url)
results = []
with ThreadPool() as pool:
with ThreadPool(processes=max_workers) as pool:
for path in helper.list(prefix=remote_url):
remote_path = (
str(Path(helper.base_url) / Path(path))

View File

@@ -37,7 +37,7 @@ import six
from pathlib2 import Path
from .backend_config.defs import get_active_config_file, get_config_file
from .backend_api.services import tasks, projects, events
from .backend_api.services import tasks, projects, events, queues
from .backend_api.session.session import (
Session, ENV_ACCESS_KEY, ENV_SECRET_KEY, ENV_HOST, ENV_WEB_HOST, ENV_FILES_HOST, )
from .backend_api.session.defs import (ENV_DEFERRED_TASK_INIT, ENV_IGNORE_MISSING_CONFIG,
@@ -91,7 +91,7 @@ from .utilities.config import verify_basic_value
from .binding.args import (
argparser_parseargs_called, get_argparser_last_args,
argparser_update_currenttask, )
from .utilities.dicts import ReadOnlyDict, merge_dicts
from .utilities.dicts import ReadOnlyDict, merge_dicts, RequirementsDict
from .utilities.proxy_object import (
ProxyDictPreWrite, ProxyDictPostWrite, flatten_dictionary,
nested_from_flat_dictionary, naive_nested_from_flat_dictionary, StubObject as _TaskStub)
@@ -248,7 +248,7 @@ class Task(_Task):
output_uri=None, # type: Optional[Union[str, bool]]
auto_connect_arg_parser=True, # type: Union[bool, Mapping[str, bool]]
auto_connect_frameworks=True, # type: Union[bool, Mapping[str, Union[bool, str, list]]]
auto_resource_monitoring=True, # type: bool
auto_resource_monitoring=True, # type: Union[bool, Mapping[str, Any]]
auto_connect_streams=True, # type: Union[bool, Mapping[str, bool]]
deferred_init=False, # type: bool
):
@@ -411,6 +411,19 @@ class Task(_Task):
- ``True`` - Automatically create resource monitoring plots. (default)
- ``False`` - Do not automatically create.
- Class Type - Create ResourceMonitor object of the specified class type.
- dict - Dictionary of kwargs to be passed to the ResourceMonitor instance.
The keys can be:
- `report_start_sec` OR `first_report_sec` OR `seconds_from_start` - Maximum number of seconds
to wait for scalar/plot reporting before defaulting
to machine statistics reporting based on seconds from experiment start time
- `wait_for_first_iteration_to_start_sec` - Set the initial time (seconds) to wait for iteration
reporting to be used as x-axis for the resource monitoring,
if timeout exceeds then reverts to `seconds_from_start`
- `max_wait_for_first_iteration_to_start_sec` - Set the maximum time (seconds) to allow the resource
monitoring to revert back to iteration reporting x-axis after starting to report `seconds_from_start`
- `report_mem_used_per_process` OR `report_global_mem_used` - Compatibility feature,
report memory usage for the entire machine
default (false), report only on the running process and its sub-processes
:param auto_connect_streams: Control the automatic logging of stdout and stderr.
The values are:
@@ -479,20 +492,28 @@ class Task(_Task):
cls.__main_task._detect_repo_async_thread = None
cls.__main_task._dev_worker = None
cls.__main_task._resource_monitor = None
# remove the logger from the previous process
cls.__main_task.get_logger()
# create a new logger (to catch stdout/err)
cls.__main_task._logger = None
cls.__main_task.__reporter = None
# noinspection PyProtectedMember
cls.__main_task._get_logger(auto_connect_streams=auto_connect_streams)
cls.__main_task._artifacts_manager = Artifacts(cls.__main_task)
# if we are using threads to send the reports,
# after forking there are no threads, so we will need to recreate them
if not getattr(cls, '_report_subprocess_enabled'):
# remove the logger from the previous process
cls.__main_task.get_logger()
# create a new logger (to catch stdout/err)
cls.__main_task._logger = None
cls.__main_task.__reporter = None
# noinspection PyProtectedMember
cls.__main_task._get_logger(auto_connect_streams=auto_connect_streams)
cls.__main_task._artifacts_manager = Artifacts(cls.__main_task)
# unregister signal hooks, they cause subprocess to hang
# noinspection PyProtectedMember
cls.__main_task.__register_at_exit(cls.__main_task._at_exit)
# start all reporting threads
BackgroundMonitor.start_all(task=cls.__main_task)
# if we are using threads to send the reports,
# after forking there are no threads, so we will need to recreate them
if not getattr(cls, '_report_subprocess_enabled'):
# start all reporting threads
BackgroundMonitor.start_all(task=cls.__main_task)
if not running_remotely():
verify_defaults_match()
@@ -543,6 +564,7 @@ class Task(_Task):
if not is_sub_process_task_id and deferred_init and deferred_init != cls.__nested_deferred_init_flag:
def completed_cb(x):
Task.__forked_proc_main_pid = os.getpid()
Task.__main_task = x
getLogger().warning("ClearML initializing Task in the background")
@@ -643,6 +665,7 @@ class Task(_Task):
except Exception:
raise
else:
Task.__forked_proc_main_pid = os.getpid()
Task.__main_task = task
# register at exist only on the real (none deferred) Task
@@ -729,10 +752,31 @@ class Task(_Task):
if auto_resource_monitoring and not is_sub_process_task_id:
resource_monitor_cls = auto_resource_monitoring \
if isinstance(auto_resource_monitoring, six.class_types) else ResourceMonitor
resource_monitor_kwargs = dict(
report_mem_used_per_process=not config.get("development.worker.report_global_mem_used", False),
first_report_sec=config.get("development.worker.report_start_sec", None),
wait_for_first_iteration_to_start_sec=config.get(
"development.worker.wait_for_first_iteration_to_start_sec", None
),
max_wait_for_first_iteration_to_start_sec=config.get(
"development.worker.max_wait_for_first_iteration_to_start_sec", None
),
)
if isinstance(auto_resource_monitoring, dict):
if "report_start_sec" in auto_resource_monitoring:
auto_resource_monitoring["first_report_sec"] = auto_resource_monitoring.pop("report_start_sec")
if "seconds_from_start" in auto_resource_monitoring:
auto_resource_monitoring["first_report_sec"] = auto_resource_monitoring.pop(
"seconds_from_start"
)
if "report_global_mem_used" in auto_resource_monitoring:
auto_resource_monitoring["report_mem_used_per_process"] = auto_resource_monitoring.pop(
"report_global_mem_used"
)
resource_monitor_kwargs.update(auto_resource_monitoring)
task._resource_monitor = resource_monitor_cls(
task,
report_mem_used_per_process=not config.get('development.worker.report_global_mem_used', False),
first_report_sec=config.get('development.worker.report_start_sec', None),
**resource_monitor_kwargs
)
task._resource_monitor.start()
@@ -1580,7 +1624,8 @@ class Task(_Task):
def set_packages(self, packages):
# type: (Union[str, Path, Sequence[str]]) -> ()
"""
Manually specify a list of required packages or a local requirements.txt file.
Manually specify a list of required packages or a local requirements.txt file. Note that this will
overwrite all existing packages.
When running remotely this call is ignored
@@ -1640,6 +1685,19 @@ class Task(_Task):
self.data.script.version_num = commit or ""
self._edit(script=self.data.script)
def get_requirements(self):
# type: () -> RequirementsDict
"""
Get the task's requirements
:return: A `RequirementsDict` object that holds the `pip`, `conda`, `orig_pip` requirements.
"""
if not running_remotely() and self.is_main_task():
self._wait_for_repo_detection(timeout=300.)
requirements_dict = RequirementsDict()
requirements_dict.update(self.data.script.requirements)
return requirements_dict
def connect_configuration(self, configuration, name=None, description=None, ignore_remote_overrides=False):
# type: (Union[Mapping, list, Path, str], Optional[str], Optional[str], bool) -> Union[dict, Path, str]
"""
@@ -2823,21 +2881,47 @@ class Task(_Task):
docker_setup_bash_script=docker_setup_bash_script
)
def set_resource_monitor_iteration_timeout(self, seconds_from_start=1800):
# type: (float) -> bool
@classmethod
def set_resource_monitor_iteration_timeout(
cls,
seconds_from_start=30.0,
wait_for_first_iteration_to_start_sec=180.0,
max_wait_for_first_iteration_to_start_sec=1800.0,
):
# type: (float, float, float) -> bool
"""
Set the ResourceMonitor maximum duration (in seconds) to wait until first scalar/plot is reported.
If timeout is reached without any reporting, the ResourceMonitor will start reporting machine statistics based
on seconds from Task start time (instead of based on iteration)
on seconds from Task start time (instead of based on iteration).
Notice! Should be called before `Task.init`.
:param seconds_from_start: Maximum number of seconds to wait for scalar/plot reporting before defaulting
to machine statistics reporting based on seconds from experiment start time
:param wait_for_first_iteration_to_start_sec: Set the initial time (seconds) to wait for iteration reporting
to be used as x-axis for the resource monitoring, if timeout exceeds then reverts to `seconds_from_start`
:param max_wait_for_first_iteration_to_start_sec: Set the maximum time (seconds) to allow the resource
monitoring to revert back to iteration reporting x-axis after starting to report `seconds_from_start`
:return: True if success
"""
if not self._resource_monitor:
return False
self._resource_monitor.wait_for_first_iteration = seconds_from_start
self._resource_monitor.max_check_first_iteration = seconds_from_start
if ResourceMonitor._resource_monitor_instances:
getLogger().warning(
"Task.set_resource_monitor_iteration_timeout called after Task.init."
" This might not work since the values might not be used in forked processes"
)
# noinspection PyProtectedMember
for instance in ResourceMonitor._resource_monitor_instances:
# noinspection PyProtectedMember
instance._first_report_sec = seconds_from_start
instance.wait_for_first_iteration = wait_for_first_iteration_to_start_sec
instance.max_check_first_iteration = max_wait_for_first_iteration_to_start_sec
# noinspection PyProtectedMember
ResourceMonitor._first_report_sec_default = seconds_from_start
# noinspection PyProtectedMember
ResourceMonitor._wait_for_first_iteration_to_start_sec_default = wait_for_first_iteration_to_start_sec
# noinspection PyProtectedMember
ResourceMonitor._max_wait_for_first_iteration_to_start_sec_default = max_wait_for_first_iteration_to_start_sec
return True
def execute_remotely(self, queue_name=None, clone=False, exit_process=True):
@@ -3335,10 +3419,24 @@ class Task(_Task):
if output_model.get("output_uri"):
model.set_upload_destination(output_model.get("output_uri"))
model.update_weights(auto_delete_file=False, **output_model["weights"])
Metrics.report_offline_session(
model,
session_folder,
iteration_offset=iteration_offset,
remote_url=task_holding_reports._get_default_report_storage_uri(),
only_with_id=output_model["id"],
session=task_holding_reports.session
)
# logs
TaskHandler.report_offline_session(task_holding_reports, session_folder, iteration_offset=iteration_offset)
# metrics
Metrics.report_offline_session(task_holding_reports, session_folder, iteration_offset=iteration_offset)
Metrics.report_offline_session(
task_holding_reports,
session_folder,
iteration_offset=iteration_offset,
only_with_id=export_data["id"],
session=task_holding_reports.session,
)
# print imported results page
print('ClearML results page: {}'.format(task_holding_reports.get_output_log_web_page()))
task_holding_reports.mark_completed()
@@ -3464,6 +3562,29 @@ class Task(_Task):
LOG_TO_BACKEND_ENV_VAR.set(True)
DEBUG_SIMULATE_REMOTE_TASK.set(True)
def get_executed_queue(self, return_name=False):
# type: (bool) -> Optional[str]
"""
Get the queue the task was executed on.
:param return_name: If True, return the name of the queue. Otherwise, return its ID
:return: Return the ID or name of the queue the task was executed on.
If no queue was found, return None
"""
queue_id = self.data.execution.queue
if not return_name or not queue_id:
return queue_id
try:
queue_name_result = Task._send(
Task._get_default_session(),
queues.GetByIdRequest(queue_id)
)
return queue_name_result.response.queue.name
except Exception as e:
getLogger().warning("Could not get name of queue with ID '{}': {}".format(queue_id, e))
return None
@classmethod
def _create(cls, project_name=None, task_name=None, task_type=TaskTypes.training):
# type: (Optional[str], Optional[str], Task.TaskTypes) -> TaskInstance
@@ -3547,6 +3668,7 @@ class Task(_Task):
return
task = cls.__main_task
cls.__main_task = None
cls.__forked_proc_main_pid = None
if task._dev_worker:
task._dev_worker.unregister()
task._dev_worker = None
@@ -3683,6 +3805,7 @@ class Task(_Task):
# mark us as the main Task, there should only be one dev Task at a time.
if not Task.__main_task:
Task.__forked_proc_main_pid = os.getpid()
Task.__main_task = task
# mark the task as started
@@ -4253,6 +4376,7 @@ class Task(_Task):
# this is so in theory we can close a main task and start a new one
if self.is_main_task():
Task.__main_task = None
Task.__forked_proc_main_pid = None
Task.__update_master_pid_task(task=None)
except Exception:
# make sure we do not interrupt the exit process

View File

@@ -3,7 +3,7 @@ import attr
from attr import validators
__all__ = ['range_validator', 'param', 'percent_param', 'TaskParameters']
__all__ = ["range_validator", "param", "percent_param", "TaskParameters"]
def _canonize_validator(current_validator):
@@ -35,23 +35,15 @@ def range_validator(min_value, max_value):
:param max_value: The maximum limit of the range, inclusive. None for no maximum limit.
:return: A new range validator.
"""
def _range_validator(instance, attribute, value):
if ((min_value is not None) and (value < min_value)) or \
((max_value is not None) and (value > max_value)):
if ((min_value is not None) and (value < min_value)) or ((max_value is not None) and (value > max_value)):
raise ValueError("{} must be in range [{}, {}]".format(attribute.name, min_value, max_value))
return _range_validator
def param(
validator=None,
range=None,
type=None,
desc=None,
metadata=None,
*args,
**kwargs
):
def param(validator=None, range=None, type=None, desc=None, metadata=None, *args, **kwargs):
"""
A parameter inside a TaskParameters class.

View File

@@ -115,6 +115,20 @@ class NestedBlobsDict(BlobsDict):
return self._keys(self, '')
class RequirementsDict(dict):
@property
def pip(self):
return self.get("pip")
@property
def conda(self):
return self.get("conda")
@property
def orig_pip(self):
return self.get("orig_pip")
def merge_dicts(dict1, dict2):
""" Recursively merges dict2 into dict1 """
if not isinstance(dict1, dict) or not isinstance(dict2, dict):

View File

@@ -17,10 +17,13 @@ from __future__ import unicode_literals
import json
import platform
import sys
import subprocess
from datetime import datetime
from ctypes import c_uint32, byref, c_int64
import psutil
from ..gpu import pynvml as N
from ..gpu import pyrsmi as R
NOT_SUPPORTED = 'Not Supported'
MB = 1024 * 1024
@@ -56,6 +59,21 @@ class GPUStat(object):
"""
return self.entry['uuid']
@property
def mig_index(self):
"""
Returns the index of the MIG partition (as in nvidia-smi).
"""
return self.entry.get("mig_index")
@property
def mig_uuid(self):
"""
Returns the uuid of the MIG partition returned by nvidia-smi when running in MIG mode,
e.g. MIG-12345678-abcd-abcd-uuid-123456abcdef
"""
return self.entry.get("mig_uuid")
@property
def name(self):
"""
@@ -160,6 +178,7 @@ class GPUStatCollection(object):
_initialized = False
_device_count = None
_gpu_device_info = {}
_mig_device_info = {}
def __init__(self, gpu_list, driver_version=None, driver_cuda_version=None):
self.gpus = gpu_list
@@ -177,7 +196,184 @@ class GPUStatCollection(object):
del GPUStatCollection.global_processes[pid]
@staticmethod
def new_query(shutdown=False, per_process_stats=False, get_driver_info=False):
def _new_query_amd(shutdown=False, per_process_stats=False, get_driver_info=False):
initialized = False
if not GPUStatCollection._initialized:
R.smi_initialize()
GPUStatCollection._initialized = True
initialized = True
def get_gpu_info(index):
def amd_query_processes():
num_procs = c_uint32()
ret = R.rocm_lib.rsmi_compute_process_info_get(None, byref(num_procs))
if R.rsmi_ret_ok(ret):
buff_sz = num_procs.value + 10
proc_info = (R.rsmi_process_info_t * buff_sz)()
ret = R.rocm_lib.rsmi_compute_process_info_get(byref(proc_info), byref(num_procs))
proc_info_list = (
[proc_info[i] for i in range(num_procs.value)]
if R.rsmi_ret_ok(ret)
else []
)
result_proc_info_list = []
# query VRAM usage explicitly, as rsmi_compute_process_info_get
# doesn't actually return VRAM usage
for proc_info in proc_info_list:
vram_query_proc_info = R.rsmi_process_info_t()
ret = R.rocm_lib.rsmi_compute_process_info_by_pid_get(
int(proc_info.process_id), byref(vram_query_proc_info)
)
if R.rsmi_ret_ok(ret):
proc_info.vram_usage = vram_query_proc_info.vram_usage
result_proc_info_list.append(proc_info)
return result_proc_info_list
return []
def get_fan_speed():
fan_level = c_int64()
fan_max = c_int64()
sensor_ind = c_uint32(0)
ret = R.rocm_lib.rsmi_dev_fan_speed_get(index, sensor_ind, byref(fan_level))
if not R.rsmi_ret_ok(ret, log_error=False):
return None
ret = R.rocm_lib.rsmi_dev_fan_speed_max_get(index, sensor_ind, byref(fan_max))
if not R.rsmi_ret_ok(ret, log_error=False):
return None
if fan_level.value <= 0 or fan_max <= 0:
return None
return float(fan_level.value) / float(fan_max.value)
def get_process_info(comp_process):
process = {}
pid = comp_process.process_id
# skip global_processes caching because PID querying seems to be inconsistent atm
# if pid not in GPUStatCollection.global_processes:
# GPUStatCollection.global_processes[pid] = psutil.Process(pid=pid)
process["pid"] = pid
try:
process["gpu_memory_usage"] = comp_process.vram_usage // MB
except Exception:
pass
return process
if not GPUStatCollection._gpu_device_info.get(index):
uuid = R.smi_get_device_id(index)
name = R.smi_get_device_name(index)
GPUStatCollection._gpu_device_info[index] = (name, uuid)
name, uuid = GPUStatCollection._gpu_device_info[index]
temperature = None # TODO: fetch temperature. It should be possible
fan_speed = get_fan_speed()
try:
memory_total = R.smi_get_device_memory_total(index)
except Exception:
memory_total = None
try:
memory_used = R.smi_get_device_memory_used(index)
except Exception:
memory_used = None
try:
utilization = R.smi_get_device_utilization(index)
except Exception:
utilization = None
try:
power = R.smi_get_device_average_power(index)
except Exception:
power = None
power_limit = None # TODO: find a way to fetch this
processes = []
if per_process_stats:
try:
comp_processes = amd_query_processes()
except Exception:
comp_processes = []
for comp_process in comp_processes:
try:
process = get_process_info(comp_process)
except psutil.NoSuchProcess:
# skip process caching for now
pass
else:
processes.append(process)
gpu_info = {
"index": index,
"uuid": uuid,
"name": name,
"temperature.gpu": temperature if temperature is not None else 0,
"fan.speed": fan_speed if fan_speed is not None else 0,
"utilization.gpu": utilization if utilization is not None else 100,
"power.draw": power if power is not None else 0,
"enforced.power.limit": power_limit if power_limit is not None else 0,
# Convert bytes into MBytes
"memory.used": memory_used // MB if memory_used is not None else 0,
"memory.total": memory_total // MB if memory_total is not None else 100,
"processes": None if (processes and all(p is None for p in processes)) else processes,
}
if per_process_stats:
GPUStatCollection.clean_processes()
return gpu_info
gpu_list = []
if GPUStatCollection._device_count is None:
GPUStatCollection._device_count = R.smi_get_device_count()
for index in range(GPUStatCollection._device_count):
gpu_info = get_gpu_info(index)
gpu_stat = GPUStat(gpu_info)
gpu_list.append(gpu_stat)
if shutdown and initialized:
R.smi_shutdown()
GPUStatCollection._initialized = False
# noinspection PyProtectedMember
driver_version = GPUStatCollection._get_amd_driver_version() if get_driver_info else None
return GPUStatCollection(gpu_list, driver_version=driver_version, driver_cuda_version=None)
@staticmethod
def _get_amd_driver_version():
# make sure the program doesn't crash with something like a SEGFAULT when querying the driver version
try:
process = subprocess.Popen(["rocm-smi", "--showdriverversion", "--json"], stdout=subprocess.PIPE)
out, _ = process.communicate()
return json.loads(out)["system"]["Driver version"]
except Exception:
try:
process = subprocess.Popen(
[
sys.executable,
"-c",
"from clearml.utilities.gpu.pyrsmi import smi_get_kernel_version, smi_initialize; "
+ "smi_initialize(); "
+ "print(smi_get_kernel_version())",
]
)
out, _ = process.communicate()
return out.strip()
except Exception:
return None
@staticmethod
def _running_in_amd_env():
# noinspection PyProtectedMember
return bool(R._find_lib_rocm())
@staticmethod
def _new_query_nvidia(shutdown=False, per_process_stats=False, get_driver_info=False):
"""Query the information of all the GPUs on local machine"""
initialized = False
if not GPUStatCollection._initialized:
@@ -190,7 +386,7 @@ class GPUStatCollection(object):
return b.decode() # for python3, to unicode
return b
def get_gpu_info(index, handle):
def get_gpu_info(index, handle, is_mig=False):
"""Get one GPU information specified by nvml handle"""
def get_process_info(nv_process):
@@ -226,12 +422,13 @@ class GPUStatCollection(object):
pass
return process
if not GPUStatCollection._gpu_device_info.get(index):
device_info = GPUStatCollection._mig_device_info if is_mig else GPUStatCollection._gpu_device_info
if not device_info.get(index):
name = _decode(N.nvmlDeviceGetName(handle))
uuid = _decode(N.nvmlDeviceGetUUID(handle))
GPUStatCollection._gpu_device_info[index] = (name, uuid)
device_info[index] = (name, uuid)
name, uuid = GPUStatCollection._gpu_device_info[index]
name, uuid = device_info[index]
try:
temperature = N.nvmlDeviceGetTemperature(
@@ -327,8 +524,37 @@ class GPUStatCollection(object):
for index in range(GPUStatCollection._device_count):
handle = N.nvmlDeviceGetHandleByIndex(index)
gpu_info = get_gpu_info(index, handle)
gpu_stat = GPUStat(gpu_info)
gpu_list.append(gpu_stat)
mig_cnt = 0
# noinspection PyBroadException
try:
mig_cnt = N.nvmlDeviceGetMaxMigDeviceCount(handle)
except Exception:
pass
if mig_cnt <= 0:
gpu_list.append(GPUStat(gpu_info))
continue
got_mig_info = False
for mig_index in range(mig_cnt):
# noinspection PyBroadException
try:
mig_handle = N.nvmlDeviceGetMigDeviceHandleByIndex(handle, mig_index)
mig_info = get_gpu_info(mig_index, mig_handle, is_mig=True)
mig_info["mig_name"] = mig_info["name"]
mig_info["name"] = gpu_info["name"]
mig_info["mig_index"] = mig_info["index"]
mig_info["mig_uuid"] = mig_info["uuid"]
mig_info["index"] = gpu_info["index"]
mig_info["uuid"] = gpu_info["uuid"]
mig_info["temperature.gpu"] = gpu_info["temperature.gpu"]
mig_info["fan.speed"] = gpu_info["fan.speed"]
gpu_list.append(GPUStat(mig_info))
got_mig_info = True
except Exception:
pass
if not got_mig_info:
gpu_list.append(GPUStat(gpu_info))
# 2. additional info (driver version, etc).
if get_driver_info:
@@ -363,6 +589,20 @@ class GPUStatCollection(object):
return GPUStatCollection(gpu_list, driver_version=driver_version, driver_cuda_version=cuda_driver_version)
@staticmethod
def new_query(shutdown=False, per_process_stats=False, get_driver_info=False):
# noinspection PyProtectedMember
if GPUStatCollection._running_in_amd_env():
# noinspection PyProtectedMember
return GPUStatCollection._new_query_amd(
shutdown=shutdown, per_process_stats=per_process_stats, get_driver_info=get_driver_info
)
else:
# noinspection PyProtectedMember
return GPUStatCollection._new_query_nvidia(
shutdown=shutdown, per_process_stats=per_process_stats, get_driver_info=get_driver_info
)
def __len__(self):
return len(self.gpus)

View File

@@ -0,0 +1,608 @@
# MIT License
#
# Copyright (c) 2023 Advanced Micro Devices, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# flake8: noqa
# This is only to ignore all flake8 errors in this file
# Python bindings for ROCm-SMI library
from ctypes import *
from os.path import join, realpath, isfile
import os
import logging
import subprocess
import sys
import threading
from enum import IntEnum, auto
def get_device_uuids():
"""Get the UUIDs of all ROCm devices from rocminfo output,
according to HSA spec.
"""
uuids = []
check_cmd ='rocminfo'
try:
proc_complete = subprocess.run(
check_cmd.split(),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
check=True,
)
for line in proc_complete.stdout.decode('utf-8').split('\n'):
# drops CPU devices from the output
if 'Uuid' in line and 'GPU-' in line:
uuids.append(line.strip().split('GPU-')[-1])
except (FileNotFoundError, subprocess.CalledProcessError) as err:
print(' Error => ', str(err))
return uuids
## Error checking
class ROCMLError_NotSupported(Exception):
pass
class ROCMLError_FunctionNotFound(Exception):
pass
class ROCMLError_LibraryNotFound(Exception):
pass
class ROCMLError_DriverNotLoaded(Exception):
pass
class ROCMLError_Unknown(Exception):
pass
class ROCMLError_Uninitialized(Exception):
pass
class ROCMLState(IntEnum):
UNINITIALIZED = auto()
"""No attempt yet made to initialize PyROCML"""
INITIALIZED = auto()
"""PyROCML was successfully initialized"""
DISABLED_PYROCML_NOT_AVAILABLE = auto()
"""PyROCML not installed"""
DISABLED_CONFIG = auto()
"""PyROCML diagnostics disabled by ``distributed.diagnostics.rocml`` config setting"""
DISABLED_LIBRARY_NOT_FOUND = auto()
"""PyROCML available, but ROCML not installed"""
LIBROCM_NAME = 'librocm_smi64.so'
RSMI_MAX_BUFFER_LENGTH = 256
# Policy enums
RSMI_MAX_NUM_FREQUENCIES = 32
class rsmi_status_t(c_int):
RSMI_STATUS_SUCCESS = 0x0
RSMI_STATUS_INVALID_ARGS = 0x1
RSMI_STATUS_NOT_SUPPORTED = 0x2
RSMI_STATUS_FILE_ERROR = 0x3
RSMI_STATUS_PERMISSION = 0x4
RSMI_STATUS_OUT_OF_RESOURCES = 0x5
RSMI_STATUS_INTERNAL_EXCEPTION = 0x6
RSMI_STATUS_INPUT_OUT_OF_BOUNDS = 0x7
RSMI_STATUS_INIT_ERROR = 0x8
RSMI_INITIALIZATION_ERROR = RSMI_STATUS_INIT_ERROR
RSMI_STATUS_NOT_YET_IMPLEMENTED = 0x9
RSMI_STATUS_NOT_FOUND = 0xA
RSMI_STATUS_INSUFFICIENT_SIZE = 0xB
RSMI_STATUS_INTERRUPT = 0xC
RSMI_STATUS_UNEXPECTED_SIZE = 0xD
RSMI_STATUS_NO_DATA = 0xE
RSMI_STATUS_UNKNOWN_ERROR = 0xFFFFFFFF
#Dictionary of rsmi ret codes and it's verbose output
rsmi_status_verbose_err_out = {
rsmi_status_t.RSMI_STATUS_SUCCESS: 'Operation was successful',
rsmi_status_t.RSMI_STATUS_INVALID_ARGS: 'Invalid arguments provided',
rsmi_status_t.RSMI_STATUS_NOT_SUPPORTED: 'Not supported on the given system',
rsmi_status_t.RSMI_STATUS_FILE_ERROR: 'Problem accessing a file',
rsmi_status_t.RSMI_STATUS_PERMISSION: 'Permission denied',
rsmi_status_t.RSMI_STATUS_OUT_OF_RESOURCES: 'Unable to acquire memory or other resource',
rsmi_status_t.RSMI_STATUS_INTERNAL_EXCEPTION: 'An internal exception was caught',
rsmi_status_t.RSMI_STATUS_INPUT_OUT_OF_BOUNDS: 'Provided input is out of allowable or safe range',
rsmi_status_t.RSMI_INITIALIZATION_ERROR: 'Error occured during rsmi initialization',
rsmi_status_t.RSMI_STATUS_NOT_YET_IMPLEMENTED: 'Requested function is not implemented on this setup',
rsmi_status_t.RSMI_STATUS_NOT_FOUND: 'Item searched for but not found',
rsmi_status_t.RSMI_STATUS_INSUFFICIENT_SIZE: 'Insufficient resources available',
rsmi_status_t.RSMI_STATUS_INTERRUPT: 'Interrupt occured during execution',
rsmi_status_t.RSMI_STATUS_UNEXPECTED_SIZE: 'Unexpected amount of data read',
rsmi_status_t.RSMI_STATUS_NO_DATA: 'No data found for the given input',
rsmi_status_t.RSMI_STATUS_UNKNOWN_ERROR: 'Unknown error occured'
}
class rsmi_init_flags_t(c_int):
RSMI_INIT_FLAG_ALL_GPUS = 0x1
class rsmi_memory_type_t(c_int):
RSMI_MEM_TYPE_FIRST = 0
RSMI_MEM_TYPE_VRAM = RSMI_MEM_TYPE_FIRST
RSMI_MEM_TYPE_VIS_VRAM = 1
RSMI_MEM_TYPE_GTT = 2
RSMI_MEM_TYPE_LAST = RSMI_MEM_TYPE_GTT
# memory_type_l includes names for with rsmi_memory_type_t
# Usage example to get corresponding names:
# memory_type_l[rsmi_memory_type_t.RSMI_MEM_TYPE_VRAM] will return string 'vram'
memory_type_l = ['VRAM', 'VIS_VRAM', 'GTT']
class rsmi_retired_page_record_t(Structure):
_fields_ = [('page_address', c_uint64),
('page_size', c_uint64),
('status', c_int)]
class rsmi_sw_component_t(c_int):
RSMI_SW_COMP_FIRST = 0x0
RSMI_SW_COMP_DRIVER = RSMI_SW_COMP_FIRST
RSMI_SW_COMP_LAST = RSMI_SW_COMP_DRIVER
class rsmi_frequencies_t(Structure):
_fields_ = [('num_supported', c_int32),
('current', c_uint32),
('frequency', c_uint64 * RSMI_MAX_NUM_FREQUENCIES)]
class rsmi_pcie_bandwidth_t(Structure):
_fields_ = [('transfer_rate', rsmi_frequencies_t),
('lanes', c_uint32 * RSMI_MAX_NUM_FREQUENCIES)]
class rsmi_process_info_t(Structure):
_fields_ = [('process_id', c_uint32),
('pasid', c_uint32), # PSA: Power Spectrum Analysis ?
('vram_usage', c_uint64),
('sdma_usage', c_uint64), # SDMA: System Direct Memory Access
('cu_occupancy', c_uint32)]
class rsmi_xgmi_status_t(c_int):
RSMI_XGMI_STATUS_NO_ERRORS = 0
RSMI_XGMI_STATUS_ERROR = 1
RSMI_XGMI_STATUS_MULTIPLE_ERRORS = 2
class rsmi_io_link_type(c_int):
RSMI_IOLINK_TYPE_UNDEFINED = 0
RSMI_IOLINK_TYPE_HYPERTRANSPORT = 1
RSMI_IOLINK_TYPE_PCIEXPRESS = 2
RSMI_IOLINK_TYPE_AMBA = 3
RSMI_IOLINK_TYPE_MIPI = 4
RSMI_IOLINK_TYPE_QPI_1_1 = 5
RSMI_IOLINK_TYPE_RESERVED1 = 6
RSMI_IOLINK_TYPE_RESERVED2 = 7
RSMI_IOLINK_TYPE_RAPID_IO = 8
RSMI_IOLINK_TYPE_INFINIBAND = 9
RSMI_IOLINK_TYPE_RESERVED3 = 10
RSMI_IOLINK_TYPE_XGMI = 11
RSMI_IOLINK_TYPE_XGOP = 12
RSMI_IOLINK_TYPE_GZ = 13
RSMI_IOLINK_TYPE_ETHERNET_RDMA = 14
RSMI_IOLINK_TYPE_RDMA_OTHER = 15
RSMI_IOLINK_TYPE_OTHER = 16
RSMI_IOLINK_TYPE_NUMIOLINKTYPES = 17
RSMI_IOLINK_TYPE_SIZE = 0xFFFFFFFF
## Library loading
rocm_lib = None
lib_load_lock = threading.Lock()
_rocm_lib_refcount = 0
## Function access, to prevent lib_load_lock deadlock
_rocml_get_function_ptr_cache = dict()
def _rocml_get_function_ptr(name):
global rocm_lib
if name in _rocml_get_function_ptr_cache:
return _rocml_get_function_ptr_cache[name]
lib_load_lock.acquire()
try:
# ensure library was loaded
if rocm_lib == None:
raise ROCMLError_Uninitialized
try:
_rocml_get_function_ptr_cache[name] = getattr(rocm_lib, name)
return _rocml_get_function_ptr_cache[name]
except AttributeError:
raise ROCMLError_FunctionNotFound
finally:
# lock is always freed
lib_load_lock.release()
def _load_rocm_library():
"""Load ROCm library if not already loaded"""
global rocm_lib
if rocm_lib == None:
lib_load_lock.acquire()
try:
if rocm_lib == None:
try:
if sys.platform[:3] == 'win':
raise ROCMLError_NotSupported('Windows platform is not supported yet')
else:
# assume linux
path_librocm = _find_lib_rocm()
cdll.LoadLibrary(path_librocm)
rocm_lib = CDLL(path_librocm)
except OSError:
raise ROCMLError_LibraryNotFound('ROCm library not found')
if rocm_lib == None:
raise ROCMLError_LibraryNotFound('ROCm library not found')
finally:
lib_load_lock.release()
def _find_lib_rocm():
"""search for librocm and returns path
if search fails, returns empty string
"""
rocm_path = os.environ.get('ROCM_PATH', '/opt/rocm')
rocm_lib_path = join(rocm_path, 'lib/{}'.format(LIBROCM_NAME))
return rocm_lib_path if isfile(rocm_lib_path) else ''
def _driver_initialized():
""" Returns true if amdgpu is found in the list of initialized modules
"""
initialized = ''
try:
initialized = str(subprocess.check_output("cat /sys/module/amdgpu/initstate |grep live", shell=True))
except subprocess.CalledProcessError:
pass
return len(initialized) > 0
def smi_initialize():
"""Initialize ROCm binding of SMI"""
_load_rocm_library()
if _driver_initialized():
ret_init = rocm_lib.rsmi_init(0)
if ret_init != 0:
logging.error('ROCm SMI init returned value {}'.format(ret_init))
raise RuntimeError('ROCm SMI initialization failed')
else:
raise RuntimeError('ROCm driver initilization failed')
# update reference count
global _rocm_lib_refcount
lib_load_lock.acquire()
_rocm_lib_refcount += 1
lib_load_lock.release()
def rsmi_ret_ok(my_ret, log_error=False):
""" Returns true if RSMI call status is 0 (success)
@param device: DRM device identifier
@param my_ret: Return of RSMI call (rocm_smi_lib API)
@param log_error: Log the error message
@param metric: Parameter of GPU currently being analyzed
"""
if my_ret != rsmi_status_t.RSMI_STATUS_SUCCESS:
if log_error:
err_str = c_char_p()
rocm_lib.rsmi_status_string(my_ret, byref(err_str))
logging.error(err_str.value.decode())
return False
return True
def smi_shutdown():
"""leave the library loaded, but shutdown the interface"""
rsmi_ret_ok(rocm_lib.rsmi_shut_down())
# update reference count
global _rocm_lib_refcount
lib_load_lock.acquire()
_rocm_lib_refcount -= 1
lib_load_lock.release()
def smi_get_kernel_version():
"""returns ROCm kernerl driver version"""
ver_str = create_string_buffer(256)
ret = rocm_lib.rsmi_version_str_get(rsmi_sw_component_t.RSMI_SW_COMP_DRIVER, ver_str, 256)
return ver_str.value.decode() if rsmi_ret_ok(ret) else ''
def smi_get_device_id(dev):
"""returns device id of the device as 64bit integer"""
uid = c_uint64()
ret = rocm_lib.rsmi_dev_id_get(dev, byref(uid))
return uid.value if rsmi_ret_ok(ret) else -1
def smi_get_device_count():
"""returns a list of GPU devices """
num_device = c_uint32(0)
ret = rocm_lib.rsmi_num_monitor_devices(byref(num_device))
return num_device.value if rsmi_ret_ok(ret) else -1
def smi_get_device_name(dev):
"""returns the name of a GPU device"""
series = create_string_buffer(RSMI_MAX_BUFFER_LENGTH)
ret = rocm_lib.rsmi_dev_name_get(dev, series, RSMI_MAX_BUFFER_LENGTH)
return series.value.decode() if rsmi_ret_ok(ret) else ''
def smi_get_device_unique_id(dev):
"""returns unique id of the device as 64bit integer"""
uid = c_uint64()
ret = rocm_lib.rsmi_dev_unique_id_get(dev, byref(uid))
return uid.value if rsmi_ret_ok(ret) else -1
def smi_get_device_utilization(dev):
"""returns GPU device busy percent of device_id dev"""
busy_percent = c_uint32()
ret = rocm_lib.rsmi_dev_busy_percent_get(dev, byref(busy_percent))
return busy_percent.value if rsmi_ret_ok(ret) else -1
def smi_get_device_memory_used(dev, type='VRAM'):
"""returns used memory of device_id dev in bytes"""
type_idx = memory_type_l.index(type)
used = c_uint64()
ret = rocm_lib.rsmi_dev_memory_usage_get(dev, type_idx, byref(used))
return used.value if rsmi_ret_ok(ret) else -1
def smi_get_device_memory_total(dev, type='VRAM'):
"""returns total memory of device_id dev in bytes"""
type_idx = memory_type_l.index(type)
total = c_uint64()
ret = rocm_lib.rsmi_dev_memory_total_get(dev, type_idx, byref(total))
return total.value if rsmi_ret_ok(ret) else -1
def smi_get_device_memory_busy(dev):
"""returns percentage of time any device memory is being used"""
busy_percent = c_uint32()
ret = rocm_lib.rsmi_dev_memory_busy_percent_get(dev, byref(busy_percent))
return busy_percent.value if rsmi_ret_ok(ret) else -1
def smi_get_device_memory_reserved_pages(dev):
"""returns info about reserved memory pages"""
num_pages = c_uint32()
records = rsmi_retired_page_record_t()
ret = rocm_lib.rsmi_dev_memory_reserved_pages_get(dev, byref(num_pages), byref(records))
return (num_pages.value, records) if rsmi_ret_ok(ret) else -1
# PCIE functions
def smi_get_device_pcie_bandwidth(dev):
"""returns list of possible pcie bandwidths for the device in bytes/sec"""
bandwidth = rsmi_pcie_bandwidth_t()
ret = rocm_lib.rsmi_dev_pci_bandwidth_get(dev, byref(bandwidth))
return bandwidth if rsmi_ret_ok(ret) else -1
def smi_get_device_pci_id(dev):
"""returns unique PCI ID of the device in 64bit Hex with format:
BDFID = ((DOMAIN & 0xffffffff) << 32) | ((BUS & 0xff) << 8) |
((DEVICE & 0x1f) <<3 ) | (FUNCTION & 0x7)
"""
bdfid = c_uint64()
ret = rocm_lib.rsmi_dev_pci_id_get(dev, byref(bdfid))
return bdfid.value if rsmi_ret_ok(ret) else -1
def smi_get_device_topo_numa_affinity(dev):
"""returns the NUMA node associated with the device"""
numa_node = c_uint32()
ret = reocm_lib.rsmi_topo_numa_affinity_get(dev, byref(numa_node))
return numa_node.value if rsmi_ret_ok(ret) else -1
def smi_get_device_pcie_throughput(dev):
"""returns measured pcie throughput for the device in bytes/sec"""
sent = c_uint64()
recv = c_uint64()
max_pkt_sz = c_uint64()
ret = rocm_lib.rsmi_dev_pci_throughput_get(dev, byref(sent), byref(recv), byref(max_pkt_sz))
return (recv.value + sent.value) * max_pkt_sz.value if rsmi_ret_ok(ret) else -1
def smi_get_device_pci_replay_counter(dev):
"""return PCIe replay counter of the device"""
counter = c_uint64()
ret = rocm_lib.rsmi_dev_pci_replay_counter_get(dev, byref(counter))
return counter.value if rsmi_ret_ok(ret) else -1
# Compute partition functions
def smi_get_device_compute_partition(dev):
"""returns the compute partition of the device"""
partition = create_string_buffer(RSMI_MAX_BUFFER_LENGTH)
ret = rocm_lib.rsmi_dev_compute_partition_get(dev, byref(partition), RSMI_MAX_BUFFER_LENGTH)
return partition.value.decode() if rsmi_ret_ok(ret) else ''
def smi_set_device_compute_partition(dev, partition):
"""modifies the compute partition of the selected device"""
ret = rocm_lib.rsmi_dev_compute_partition_set(dev, partition)
return rsmi_ret_ok(ret)
def smi_reset_device_compute_partition(dev):
"""reverts the compute partition of the selected device to its boot state"""
ret = rocm_lib.rsmi_dev_compute_partition_reset(dev)
return rsmi_ret_ok(ret)
# Memory partition functions
def smi_get_device_memory_partition(dev):
"""returns the memory partition of the device"""
partition = create_string_buffer(RSMI_MAX_BUFFER_LENGTH)
ret = rocm_lib.rsmi_dev_memory_partition_get(dev, byref(partition), RSMI_MAX_BUFFER_LENGTH)
return partition.value.decode() if rsmi_ret_ok(ret) else ''
def smi_set_device_memory_partition(dev, partition):
"""modifies the memory partition of the selected device"""
ret = rocm_lib.rsmi_dev_memory_partition_set(dev, partition)
return rsmi_ret_ok(ret)
def smi_reset_device_memory_partition(dev):
"""reverts the memory partition of the selected device to its boot state"""
ret = rocm_lib.rsmi_dev_memory_partition_reset(dev)
return rsmi_ret_ok(ret)
# Hardware Topology functions
def smi_get_device_topo_numa_node_number(dev):
"""returns the NUMA node associated with the device"""
numa_node = c_uint32()
ret = rocm_lib.rsmi_topo_get_numa_node_number(dev, byref(numa_node))
return numa_node.value if rsmi_ret_ok(ret) else -1
def smi_get_device_topo_link_weight(dev_src, dev_dst):
"""returns the weight of the link between two devices"""
weight = c_uint64()
ret = rocm_lib.rsmi_topo_get_link_weight(dev_src, dev_dst, byref(weight))
return weight.value if rsmi_ret_ok(ret) else -1
def smi_get_device_minmax_bandwidth(dev_src, dev_dst):
"""returns the minimum and maximum io link bandwidth between two devices
API works if src and dst are connected via XGMI and are 1 hop away.
"""
assert smi_get_device_link_type(dev_src, dev_dst)[0] == 1, 'Devices must be 1 hop away'
min_bandwidth = c_uint64()
max_bandwidth = c_uint64()
ret = rocm_lib.rsmi_minmax_bandwidth_get(dev_src, dev_dst, byref(min_bandwidth), byref(max_bandwidth))
return (min_bandwidth.value, max_bandwidth.value) if rsmi_ret_ok(ret) else -1
def smi_get_device_link_type(dev_src, dev_dst):
"""returns the hops and the type of link between two devices"""
hops = c_uint64()
link_type = rsmi_io_link_type()
ret = rocm_lib.rsmi_topo_get_link_type(dev_src, dev_dst, byref(hops), byref(link_type))
return (hops.value, link_type.value) if rsmi_ret_ok(ret) else -1
def smi_is_device_p2p_accessible(dev_src, dev_dst):
"""returns true if two devices are p2p accessible"""
accessible = c_bool()
ret = rocm_lib.rsmi_is_P2P_accessible(dev_src, dev_dst, byref(accessible))
return accessible.value if rsmi_ret_ok(ret) else -1
def smi_get_device_compute_process():
"""returns list of process ids running compute on the system"""
num_procs = c_uint32()
ret = rocm_lib.rsmi_compute_process_info_get(None, byref(num_procs))
if rsmi_ret_ok(ret):
buff_sz = num_procs.value + 10
proc_info = (rsmi_process_info_t * buff_sz)()
ret2 = rocm_lib.rsmi_compute_process_info_get(byref(proc_info), byref(num_procs))
return [proc_info[i].process_id for i in range(num_procs.value)] if rsmi_ret_ok(ret2) else []
else:
return []
def smi_get_device_average_power(dev):
"""returns average power of device_id dev"""
power = c_uint32()
ret = rocm_lib.rsmi_dev_power_ave_get(dev, 0, byref(power))
return power.value * 1e-6 if rsmi_ret_ok(ret) else -1
# XGMI fuctions
def smi_get_device_xgmi_error_status(dev):
"""returns XGMI error status for a device"""
status = rsmi_xgmi_status_t()
ret = rocm_lib.rsmi_dev_xgmi_error_status(dev, byref(status))
return status.value if rsmi_ret_ok(ret) else -1
def smi_reset_device_xgmi_error(dev):
"""resets XGMI error status for a device"""
ret = rocm_lib.rsmi_dev_xgmi_error_reset(dev)
return rsmi_ret_ok(ret)
def smi_get_device_xgmi_hive_id(dev):
"""returns XGMI hive ID for a device"""
hive_id = c_uint64()
ret = rocm_lib.rsmi_dev_xgmi_hive_id_get(dev, byref(hive_id))
return hive_id.value if rsmi_ret_ok(ret) else -1
# constants for the UUID function
B1 = '%02x'
B2 = B1 * 2
B4 = B1 * 4
B6 = B1 * 6
nv_fmt = 'GPU-{b4}-{b2}-{b2}-{b2}-{b6}'.format(b2=B2, b4=B4, b6=B6)
# UUID function
def smi_get_device_uuid(dev, format='roc'):
DEVICE_UUIDS = get_device_uuids()
"""returns the UUID of the device"""
assert dev < len(DEVICE_UUIDS), 'Device index out of range'
u_s = DEVICE_UUIDS[dev]
if format == 'roc':
# use hex strings
return 'GPU-{}'.format(u_s)
elif format == 'nv':
# break down to ASCII strings according to the format
b_a = bytearray()
b_a.extend(map(ord, u_s))
return nv_fmt % tuple(b_a)
else:
raise ValueError('Invalid format: \'{}\'; use \'roc\' or \'nv\''.format(format))

View File

@@ -34,7 +34,13 @@ def get_public_ip():
:return: A string representing the IP of this machine or `None` if getting the IP failed
"""
for external_service in ["https://api.ipify.org", "https://ident.me"]:
from ..config import config_obj
# todo: add documentation in api section in conf file
public_ip_service_urls = (
config_obj.get("api.public_ip_service_urls", None)
or ["https://api.ipify.org", "https://ident.me"]
)
for external_service in public_ip_service_urls:
ip = get_public_ip_from_external_service(external_service)
if ip:
return ip
@@ -72,10 +78,14 @@ def get_public_ip_from_external_service(external_service, timeout=5):
def _get_private_ip_from_socket():
from ..config import config_obj
# todo: add documentation in api section in conf file
public_ip_ping = config_obj.get("api.public_ip_ping", None) or "8.8.8.8"
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0)
try:
s.connect(("8.8.8.8", 1))
s.connect((public_ip_ping, 1))
ip = s.getsockname()[0]
except Exception as e:
raise e

View File

@@ -39,6 +39,9 @@ class ProxyDictPostWrite(dict):
a_dict[k] = i
return a_dict
def to_dict(self):
return self._to_dict()
def update(self, E=None, **F):
res = self._do_update(E, **F)
self._set_callback()

View File

@@ -12,6 +12,7 @@ from typing import Text
from .process.mp import BackgroundMonitor
from ..backend_api import Session
from ..binding.frameworks.tensorflow_bind import IsTensorboardInit
from ..config import config
try:
from .gpu import gpustat
@@ -22,17 +23,47 @@ except ImportError:
class ResourceMonitor(BackgroundMonitor):
_title_machine = ':monitor:machine'
_title_gpu = ':monitor:gpu'
_first_report_sec_default = 30.0
_wait_for_first_iteration_to_start_sec_default = 180.0
_max_wait_for_first_iteration_to_start_sec_default = 1800.0
_resource_monitor_instances = []
def __init__(self, task, sample_frequency_per_sec=2., report_frequency_sec=30.,
first_report_sec=None, wait_for_first_iteration_to_start_sec=180.0,
max_wait_for_first_iteration_to_start_sec=1800., report_mem_used_per_process=True):
first_report_sec=None, wait_for_first_iteration_to_start_sec=None,
max_wait_for_first_iteration_to_start_sec=None, report_mem_used_per_process=True):
super(ResourceMonitor, self).__init__(task=task, wait_period=sample_frequency_per_sec)
# noinspection PyProtectedMember
ResourceMonitor._resource_monitor_instances.append(self)
self._task = task
self._sample_frequency = sample_frequency_per_sec
self._report_frequency = report_frequency_sec
self._first_report_sec = first_report_sec or report_frequency_sec
self.wait_for_first_iteration = wait_for_first_iteration_to_start_sec
self.max_check_first_iteration = max_wait_for_first_iteration_to_start_sec
# noinspection PyProtectedMember
self._first_report_sec = next(
value
# noinspection PyProtectedMember
for value in (first_report_sec, ResourceMonitor._first_report_sec_default, report_frequency_sec)
if value is not None
)
self.wait_for_first_iteration = next(
value
for value in (
wait_for_first_iteration_to_start_sec,
# noinspection PyProtectedMember
ResourceMonitor._wait_for_first_iteration_to_start_sec_default,
0.0
)
if value is not None
)
self.max_check_first_iteration = next(
value
for value in (
max_wait_for_first_iteration_to_start_sec,
# noinspection PyProtectedMember
ResourceMonitor._max_wait_for_first_iteration_to_start_sec_default,
0.0
)
if value is not None
)
self._num_readouts = 0
self._readouts = {}
self._previous_readouts = {}
@@ -44,6 +75,17 @@ class ResourceMonitor(BackgroundMonitor):
self._last_process_pool = {}
self._last_process_id_list = []
self._gpu_memory_per_process = True
self._default_gpu_utilization = config.get("resource_monitoring.default_gpu_utilization", 100)
# allow default_gpu_utilization as null in the config, in which case we don't log anything
if self._default_gpu_utilization is not None:
self._default_gpu_utilization = int(self._default_gpu_utilization)
self._gpu_utilization_warning_sent = False
# noinspection PyBroadException
try:
self._debug_mode = bool(os.getenv("CLEARML_RESMON_DEBUG", ""))
except Exception:
self._debug_mode = False
if not self._gpustat:
self._task.get_logger().report_text('ClearML Monitor: GPU monitoring is not available')
@@ -247,8 +289,11 @@ class ResourceMonitor(BackgroundMonitor):
# something happened and we can't use gpu stats,
self._gpustat_fail += 1
if self._gpustat_fail >= 3:
self._task.get_logger().report_text('ClearML Monitor: GPU monitoring failed getting GPU reading, '
'switching off GPU monitoring')
msg = 'ClearML Monitor: GPU monitoring failed getting GPU reading, switching off GPU monitoring'
if self._debug_mode:
import traceback
msg += "\n" + traceback.format_exc()
self._task.get_logger().report_text(msg)
self._gpustat = None
return stats
@@ -303,13 +348,18 @@ class ResourceMonitor(BackgroundMonitor):
return mem_size
def _skip_nonactive_gpu(self, idx, gpu):
def _skip_nonactive_gpu(self, gpu):
if not self._active_gpus:
return False
# noinspection PyBroadException
try:
uuid = getattr(gpu, "uuid", None)
return str(idx) not in self._active_gpus and (not uuid or uuid not in self._active_gpus)
mig_uuid = getattr(gpu, "mig_uuid", None)
return (
str(gpu.index) not in self._active_gpus
and (not uuid or uuid not in self._active_gpus)
and (not mig_uuid or mig_uuid not in self._active_gpus)
)
except Exception:
pass
return False
@@ -338,7 +388,7 @@ class ResourceMonitor(BackgroundMonitor):
self._gpu_memory_per_process = False
break
# only monitor the active gpu's, if none were selected, monitor everything
if self._skip_nonactive_gpu(i, g):
if self._skip_nonactive_gpu(g):
continue
gpu_mem[i] = 0
@@ -358,15 +408,32 @@ class ResourceMonitor(BackgroundMonitor):
for i, g in enumerate(gpu_stat.gpus):
# only monitor the active gpu's, if none were selected, monitor everything
if self._skip_nonactive_gpu(i, g):
if self._skip_nonactive_gpu(g):
continue
stats["gpu_%d_temperature" % i] = float(g["temperature.gpu"])
stats["gpu_%d_utilization" % i] = float(g["utilization.gpu"])
if g["utilization.gpu"] is not None:
stats["gpu_%d_utilization" % i] = float(g["utilization.gpu"])
else:
stats["gpu_%d_utilization" % i] = self._default_gpu_utilization
if not self._gpu_utilization_warning_sent:
if g.mig_index is not None:
self._task.get_logger().report_text(
"Running inside MIG, Nvidia driver cannot export utilization, pushing fixed value {}".format( # noqa
self._default_gpu_utilization
)
)
else:
self._task.get_logger().report_text(
"Nvidia driver cannot export utilization, pushing fixed value {}".format(
self._default_gpu_utilization
)
)
self._gpu_utilization_warning_sent = True
stats["gpu_%d_mem_usage" % i] = 100. * float(g["memory.used"]) / float(g["memory.total"])
# already in MBs
stats["gpu_%d_mem_free_gb" % i] = float(g["memory.total"] - g["memory.used"]) / 1024
# use previously sampled process gpu memory, or global if it does not exist
stats["gpu_%d_mem_used_gb" % i] = float(gpu_mem[i] if gpu_mem else g["memory.used"]) / 1024
stats["gpu_%d_mem_used_gb" % i] = float(gpu_mem[i] if gpu_mem and i in gpu_mem else g["memory.used"]) / 1024
return stats
@@ -389,7 +456,7 @@ class ResourceMonitor(BackgroundMonitor):
if self._gpustat:
gpu_stat = self._gpustat.new_query(shutdown=True, get_driver_info=True)
if gpu_stat.gpus:
gpus = [g for i, g in enumerate(gpu_stat.gpus) if not self._skip_nonactive_gpu(i, g)]
gpus = [g for i, g in enumerate(gpu_stat.gpus) if not self._skip_nonactive_gpu(g)]
specs.update(
gpu_count=int(len(gpus)),
gpu_type=', '.join(g.name for g in gpus),

View File

@@ -1 +1 @@
__version__ = "1.14.4"
__version__ = "1.16.1"

View File

@@ -176,7 +176,7 @@ sdk {
vcs_repo_detect_async: true
# Store uncommitted git/hg source code diff in experiment manifest when training in development mode
# This stores "git diff" or "hg diff" into the experiment's "script.requirements.diff" section
# This stores "git diff" or into the experiment's "script.requirements.diff" section
store_uncommitted_code_diff: true
store_code_diff_from_remote: false
@@ -228,6 +228,17 @@ sdk {
# compatibility feature, report memory usage for the entire machine
# default (false), report only on the running process and its sub-processes
report_global_mem_used: false
# if provided, start resource reporting after this amount of seconds
# report_start_sec: 30
# set the initial time (seconds) to wait for iteration reporting to be used as x-axis for the
# resource monitoring, if timeout exceeds then reverts to "seconds from start"
# wait_for_first_iteration_to_start_sec: 30
# set the maximum time (seconds) to allow the resource monitoring to revert back to
# iteration reporting x-axis after starting to report "seconds from start"
# max_wait_for_first_iteration_to_start_sec: 1800
}
}
}

View File

@@ -282,9 +282,6 @@
"new_task.update_parameters({\"General/max_depth\": 3})\n",
"# We can even rename it if we wanted\n",
"new_task.rename(f\"Cloned Task\")\n",
"# Make sure that the diff does not contain Colab invocation!\n",
"# cf. https://github.com/allegroai/clearml/issues/1204\n",
"new_task.set_script(diff=\"pass\")\n",
"# Now enqueue it for the colab worker to start working on it!\n",
"Task.enqueue(task=new_task, queue_name=\"default\")"
]

View File

@@ -1,4 +1,4 @@
tqdm==4.64.1
tqdm==4.66.3
clearml>=1.14.4
github3.py==3.2.0
tabulate==0.9.0

View File

@@ -106,12 +106,12 @@ task.set_model_label_enumeration(labels)
output_folder = os.path.join(tempfile.gettempdir(), 'keras_example')
board = TensorBoard(histogram_freq=1, log_dir=output_folder, write_images=False)
model_store = ModelCheckpoint(filepath=os.path.join(output_folder, 'weight.{epoch}.hdf5'))
model_store = ModelCheckpoint(filepath=os.path.join(output_folder, 'weight.{epoch}.keras'))
# load previous model, if it is there
# noinspection PyBroadException
try:
model.load_weights(os.path.join(output_folder, 'weight.1.hdf5'))
model.load_weights(os.path.join(output_folder, 'weight.1.keras'))
except Exception:
pass

View File

@@ -93,7 +93,7 @@ train_summary_writer = tf.summary.create_file_writer(train_log_dir)
test_summary_writer = tf.summary.create_file_writer(test_log_dir)
# Set up checkpoints manager
ckpt = tf.train.Checkpoint(step=tf.Variable(1), optimizer=optimizer, net=model)
ckpt = tf.train.Checkpoint(step=tf.Variable(1), net=model)
manager = tf.train.CheckpointManager(ckpt, os.path.join(gettempdir(), 'tf_ckpts'), max_to_keep=3)
ckpt.restore(manager.latest_checkpoint)
if manager.latest_checkpoint:
@@ -129,7 +129,13 @@ for epoch in range(EPOCHS):
test_accuracy.result()*100))
# Reset the metrics for the next epoch
train_loss.reset_states()
train_accuracy.reset_states()
test_loss.reset_states()
test_accuracy.reset_states()
try:
train_loss.reset_states()
train_accuracy.reset_states()
test_loss.reset_states()
test_accuracy.reset_states()
except AttributeError:
train_loss.reset_state()
train_accuracy.reset_state()
test_loss.reset_state()
test_accuracy.reset_state()

View File

@@ -70,6 +70,7 @@ setup(
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12',
'License :: OSI Approved :: Apache Software License',
],
keywords='clearml trains development machine deep learning version control machine-learning machinelearning '